I have overridden multiprocess.Process (fork of multiprocessing library) like so:
# source.Process.py
class Process(multiprocess.Process):
def __init__(self, test_name: str, *args, **kwargs) -> None:
multiprocess.Process.__init__(self, *args, **kwargs)
self._parent_conn, self._child_conn = multiprocess.Pipe()
self._exception = None
self._test_name = test_name
def run(self) -> None:
try:
start = time.perf_counter()
logger = S_Logger(self._test_name).get_logger()
logger.info('EXECUTION OF %s HAS STARTED.', self._test_name)
multiprocess.Process.run(self)
self._child_conn.send(None)
except Exception as e:
tb = traceback.format_exc()
logger.error(f'EXCEPTION OCCURRED: {tb}')
self._child_conn.send((e, tb))
finally:
logger.info('EXECUTION OF %s HAS ENDED.', self._test_name)
end = time.perf_counter()
logger.info(f'FINISHED in {round(end-start, 2)} second(s)')
When I create normal Process using this class everything works perfectly, including creating logs.
Now I want to create a Process Pool of such customized processes but I encountered problem with respawning such processes after they life comes to an end. Here is how I create pool with additional maxtasksperchild=1 argument.
from source.process import Process
ctx = multiprocess.get_context()
def run_tests(self):
def worker(x):
print(x**2)
time.sleep(1)
with ctx.Pool(processes=4, maxtasksperchild=1) as pool:
nums = range(10)
ctx.Process = Process(test_name='test_name')
pool.map(worker, nums)
This gives me such output:
0
1
4
9
Exception in thread Thread-1 (_handle_workers):
Traceback (most recent call last):
File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.10_3.10.2032.0_x64__qbz5n2kfra8p0\lib\threading.py", line 1016, in _bootstrap_inner
self.run()
File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.10_3.10.2032.0_x64__qbz5n2kfra8p0\lib\threading.py", line 953, in run
self._target(*self._args, **self._kwargs)
File "C:\Users\<user>\Documents\Projects\sprinter\.venv\lib\site-packages\multiprocess\pool.py", line 513, in _handle_workers
cls._maintain_pool(ctx, Process, processes, pool, inqueue,
File "C:\Users\<user>\Documents\Projects\sprinter\.venv\lib\site-packages\multiprocess\pool.py", line 337, in _maintain_pool
Pool._repopulate_pool_static(ctx, Process, processes, pool,
File "C:\Users\<user>\Documents\Projects\sprinter\.venv\lib\site-packages\multiprocess\pool.py", line 319, in _repopulate_pool_static
w = Process(ctx, target=worker,
File "C:\Users\<user>\Documents\Projects\sprinter\.venv\lib\site-packages\multiprocess\pool.py", line 181, in Process
return ctx.Process(*args, **kwds)
TypeError: 'Process' object is not callable
And this brings to my mind two questions:
- Why there is no logging? If I don't use pool, logs appear correctly.
- Why after four processes being executed, the new ones that should be respawned have problem to be created? (Not callable error). If I remove the
maxtasksperchildargument it works perfectly (0, 1, 4, 9, 16, 25...)
The error here is because you are replacing ctx.Process (a class) with an instance of your own subclass. Instances, unless they have
__call__method defined, are not callable. But even if you were to replace it with your subclass, it wouldn't work. This is because you will get a recursion or attribute error since you are replacing a class with a subclass of that same class.This is because you never really successfully patched the pool class to use your subclass of Process, this also ties into your second question (read on).
The reason this happens is because pool creates the processes when you start the context manager itself (on line
ctx.Pool(processes=4, maxtasksperchild=1) as pool). Since you are applying your patch after the processes start, it won't have much of an effect unless the pool was to start the processes again (this is wheremaxtasksperchildcomes in). Hence if you providemaxtasksperchildthen the pool will attempt to start another process, but because of the faulty patch, it will return error. If you don't set amaxtasksperchildthen the pool won't care about the patch you applied since it doesn't have to start a process again.Regardless, here's a better patch to do what you want
Note how
test_nameis now a keyword argument and also optional. This is so to make it work withfunctools.partial. You probably want to perform checks so that the value is passed and is valid.