Zombie processes, here we go again

1k views Asked by At

I'm struggling a lot with multiprocessing/threading/subprocessing. What I'm basically trying to do is to execute every single binary available on my computer, I wrote a python script to do so. But I keep having zombie processes ("defunct"), which end up in a deadlock if all 4 of my workers are in this state. I tried lots of different things, but nothing seems to do it :(

Here's what the architecture looks like :

|   \_ python -m dataset --generate
|       \_ worker1
|       |   \_ [thread1] firejail bin1
|       \_ worker2
|       |   \_ [thread1] firejail bin1
|       |   \_ [thread2] firejail bin2
|       |   \_ [thread3] firejail bin3
|       \_ worker3
|       |   \_ [thread1] [firejail] <defunct>
|       \_ worker4
|       |   \_ [thread1] [firejail] <defunct>

There are 4 workers that I create as such :

# spawn mode prevents deadlocks https://codewithoutrules.com/2018/09/04/python-multiprocessing/
with get_context("spawn").Pool() as pool:

    results = []

    for binary in binaries:
        result = pool.apply_async(legit.analyse, args=(binary,),
                                  callback=_binary_analysis_finished_callback,
                                  error_callback=error_callback)
        results.append(result)

(Note I use a "spawn" pool, but now I'm wondering if it's of any use...)

Each worker will create multiple threads like this :

threads = []
executions = []

def thread_wrapper(*args):
    flows, output, returncode = _exec_using_firejail(*args)
    executions.append(Execution(*args, flows, is_malware=False))

for command_line in potentially_working_command_lines:
    thread = Thread(target=thread_wrapper, args=(command_line,))
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

And each thread will start a new process in the firejail sandbox :

process = subprocess.Popen(FIREJAIL_COMMAND +
                           ["strace", "-o", output_filename, "-ff", "-xx", "-qq", "-s", "1000"] + command_line,
                           stdout=subprocess.PIPE, stderr=subprocess.PIPE, preexec_fn=os.setsid)

try:
    out, errs = process.communicate(timeout=5, input=b"Y\nY\nY\nY\nY\nY\nY\nY\nY\nY\nY\nY\nY\nY\nY\nY\n")
    # print("stdout:", out)
    # print("stderr:", errs)

except subprocess.TimeoutExpired:
    # print(command_line, "timed out")
    os.killpg(os.getpgid(process.pid), signal.SIGKILL)
    out, errs = process.communicate()

I use os.killpg() and not process.kill() because for some reasons subprocesses of my Popen process are not killed... This is possible thanks to preexec_fn=os.setsid which sets the gid of all descendants. But even with this method, some processes such as zsh will provoke a zombie process because it looks like zsh changes its gid and so my os.killpg doesn't work as expected...

I'm looking for a way to be a 100% percent sure all processes will be dead.

1

There are 1 answers

3
James On

If you want to use the subprocess module for this, you should use the .kill method of the process object directly instead of using the os module. Using communicate is a blocking action; so Python will wait until for a response. Using the timeout parameter helps, but will be slow for lots of processes.

import subprocess

cmd_list = (
    FIREJAIL_COMMAND 
    + ["strace", "-o", output_filename, "-ff", "-xx", "-qq", "-s", "1000"] 
    + command_line
) 
proc = subprocess.Popen(
    cmd_list,
    stdout=subprocess.PIPE, 
    stderr=subprocess.PIPE, 
    preexec_fn=os.setsid
)

try:
    out, errs = proc.communicate(timeout=5, input=b"Y\n" * 16)
except subprocess.TimeoutExpired:
    proc.kill()
    out, errs = None, None

ret_code = process.wait()

If you want to run it in a non-blocking loop over a set of processes, that is when you use poll. Here is an example. This assumes you have a list of filenames and corresponding command_lines that you want to feed to the process creation.

import subprocess
import time

def create_process(output_filename, command_line):
    cmd_list = (
        FIREJAIL_COMMAND 
        + ["strace", "-o", output_filename, "-ff", "-xx", "-qq", "-s", "1000"] 
        + command_line
    ) 
    proc = subprocess.Popen(
        cmd_list,
        stdout=subprocess.PIPE, 
        stderr=subprocess.PIPE, 
        preexec_fn=os.setsid
    )
    return {proc: (output_filename, command_line)}

processes = [create_process for f, c in zip(filenames, command_lines)]

TIMEOUT = 5
WAIT = 0.25  # how long to wait between checking the processes
finished = []
for _ in range(round(TIMEOUT / WAIT)):
    finished_new = []
    if not processes:
        break
    for proc in processes:
        if proc.poll():
            finished_new.append(proc)
    # cleanup
    for proc in finished_new:
        process.remove(proc)
    finished.extend(finished_new)
    time.sleep(WAIT)