How to send a class with a multiprocessing.Pipe member through the ProcessPoolExecutor in Python3.10

129 views Asked by At

I'm trying to write a progress bar class that supports multiprocessing that will be displayed in a terminal (OS agnostic). I'm aware of packages like tqdm that supports this and have used them before, but this is for a project where I want to only use standard libraries, so no "pip install x" solutions, it would have to work on a fresh install of python3.10 and later.

A stripped down version of the class looks like this:

class ProgressBar:
    def __init__(self, total: int, count=0, pipe_rec_connection=None, pipe_send_connection=None):
        self.total = total
        self.count = count
        self.pipe_rec_connection = pipe_rec_connection
        self.pipe_send_connection = pipe_send_connection

        if self.pipe_rec_connection :
            self.progress_listener = multiprocessing.Process(target=self.process_listener)
            self.progress_listener.start()

    def print(self) -> None:
        # Some string manipulation and then calling print()

    def update(self, count=1) -> None:
        self.count += int(count)
        self.print()

    def process_listener(self) -> None:
        while self.count < self.total:
            self.update(self.pipe_rec_connection.recv())

    def multiprocess_update(self, count: int) -> None:
        self.pipe_send_connection.send(count)

    def __getstate__(self):
        return self.__dict__.copy()

    def __setstate__(self, state):
        self.__dict__.update(state)

I've tried testing the class like this:

def process(progress_bar: ProgressBar):
    time.sleep(random.randint(0, 2))
    progress_bar.multiprocess_update(1)


if __name__ == '__main__':
    conn_1, conn_2 = Pipe()
    p = ProgressBar(100, pipe_rec_connection=conn_1, pipe_send_connection=conn_2)
    p.update(0)
    time.sleep(1)
    with ProcessPoolExecutor(max_workers=4) as executor:
        executor.map(process, [p] * 100)
    print('\nDone')

However, with some inspections it seems like process() never runs when ProgressBar is passed as an argument so the progress bar is stuck on 0 with no error messages shown.

A solution that works, but is not optimal since it relies on a variable not tied to a class instance is this one:

def process(pipe):
    time.sleep(random.randint(0, 2))
    pipe.send(1)


if __name__ == '__main__':
    conn_1, conn_2 = Pipe()
    p = ProgressBar(100, pipe_rec_connection=conn_1)
    p.update(0)
    time.sleep(1)
    with ProcessPoolExecutor(max_workers=4) as executor:
        executor.map(process, [conn_2] * 100)
    print('\nDone')

So what my issue boils down to, is it even possible to pass the class containing the Pipe connection as an argument to ProcessPoolExecutor? And if not, is there a better solution than passing the Pipe connection as an argument? Preferably, the Pipe connection should be tied to a class instance.

Edit: a minimal reproducible example as requested. Comment out pipe.send(1) and "uncomment" progress_bar.multiprocess_update(1) and p_list = [p] * 100 to try out the example where ProgressBar is passed as an argument.

import time
import random
import multiprocessing
from concurrent.futures import ProcessPoolExecutor


class ProgressBar:
    def __init__(self, total: int, count=0, pipe_rec_connection=None, pipe_send_connection=None):
        self.total = total
        self.count = count
        self.pipe_rec_connection = pipe_rec_connection
        self.pipe_send_connection = pipe_send_connection

        if self.pipe_rec_connection:
            self.progress_listener = multiprocessing.Process(target=self.process_listener)
            self.progress_listener.start()

    def print(self) -> None:
        print(f'\r{self.count}/{self.total}', end='')

    def update(self, count=1) -> None:
        self.count += int(count)
        self.print()

    def process_listener(self) -> None:
        while self.count < self.total:
            self.update(self.pipe_rec_connection.recv())

    def multiprocess_update(self, count: int) -> None:
        self.pipe_send_connection.send(count)

    def __getstate__(self):
        return self.__dict__.copy()

    def __setstate__(self, state):
        self.__dict__.update(state)


def process(pipe, progress_bar=None):
    time.sleep(random.randint(0, 2))
    pipe.send(1)
    #progress_bar.multiprocess_update(1)


if __name__ == '__main__':
    conn_1, conn_2 = multiprocessing.Pipe()
    p = ProgressBar(100, pipe_rec_connection=conn_1, pipe_send_connection=conn_2)
    p.update(0)
    time.sleep(1)
    p_list = [None] * 100
    # p_list = [p] * 100
    with ProcessPoolExecutor(max_workers=4) as executor:
        executor.map(process, [conn_2] * 100, p_list)

    print('\nDone')
0

There are 0 answers