End of the execution too long using Pool.starmap

256 views Asked by At

I'm executing a paralelized function using Pool.starmap function. The execution of the function it self only takes 6.5 minutes according to tqdm library but the program stays in execution for 20 min more until it finishes. The function is processing and applying filters to some strings in some colums of a pandas dataframe. A different paralelized function could perform better? There is something wrong with starmap function?

Functon to be executed:

def get_best_string_filters(hst, apolnmar, apolnmod, apolnsub, apolnterm, amodnanu, ps, cc, cilindros, combustible, gearbox, year, search_model, search_version, search_container):
    select = table_ecode[(table_ecode.HST == hst)]
    
    year = int(year[-4:])
    
    select = initial_selection(select, ps, cc, cilindros, combustible, gearbox, year)
    
    temp = get_starting_selection(select.copy(), search_model, "HTB")
    if temp.empty:
        search_model, search_version, search_container = find_best_combination(select, search_model, search_version, search_container)
    else:
        select = temp.copy()
        _, search_version, search_container = find_best_combination(select, "", search_version, search_container)
    
    #print(search_model, search_version, search_container)
    
    return [apolnmar, apolnmod, apolnsub, apolnterm, amodnanu, search_model, search_version, search_container]

starmap call:

if not exists("dict_search_ammo_make_version_fixed.npy"):
    params = [(a, b, c, d, e, f, g, h, i, j, k, l, m, n, o) for a, b, c, d, e, f, g, h, i, j, k, l, m, n, o in values_to_change.values]
    with Pool(mp.cpu_count()) as ex:
        array_split_ammo_make_version = ex.starmap(get_best_string_filters, tqdm(params, total=len(params)))
        dict_split_ammo_make_version = array_to_dict(array_split_ammo_make_version)
        # save the dict to disk for faster future executions
        np.save("dict_search_ammo_make_version_fixed.npy", dict_split_ammo_make_version)
else:
    dict_split_ammo_make_version = np.load('dict_search_ammo_make_version_fixed.npy',allow_pickle='TRUE').item()

tqdm outputs 6.5 minutes and a completed status but the script continues to run for 20 long minutes: Execution image

1

There are 1 answers

3
Booboo On BEST ANSWER

In the demos below, generator function params simulates generating arguments to worker function foo slowly and foo, which just returns the passed argument, which is either a list when using imap or individual arguments that are the elements of a list.

Using imap

import time

def foo(the_list):
    time.sleep(10)
    return the_list

if __name__ == '__main__':
    from tqdm import tqdm
    from multiprocessing import Pool

    def params():
        for i in range(1, 9):
            time.sleep(1)
            yield list(range(i))

    with Pool() as ex:
        it = ex.imap(foo, params())
        results = list(tqdm(it, total=8))
    print(results)

Using apply_async

import time

def foo(*args):
    time.sleep(10)
    return args

if __name__ == '__main__':
    from tqdm import tqdm
    from multiprocessing import Pool

    def params():
        for i in range(1, 9):
            time.sleep(1)
            yield list(range(i))

    def my_callback(result):
        bar.update(1)

    with Pool() as ex, tqdm(total=8) as bar:
        results = []
        async_results = [ex.apply_async(foo, param, callback=my_callback) for param in params()]
        results = [async_result.get() for async_result in async_results]
    print(results)

imap with fixed sized tuples

import time

def foo(tpl):
    time.sleep(10)
    # unpack:
    a, b, c, d, e, f, g, h = tpl
    return (a + b) * (c +  d) * (e + f) * (g + h)


if __name__ == '__main__':
    from tqdm import tqdm
    from multiprocessing import Pool

    def params():
        for i in range(1, 9):
            time.sleep(1)
            yield list(range(8))

    with Pool() as ex:
        it = ex.imap(foo, params())
        results = list(tqdm(it, total=8))
    print(results)