How can I chop up data before sending it into a multiprocessing function?

39 views Asked by At

Graph of smoothed and unsmoothed data example

def smooth_data_mp(data_frame, n):  # dataframe is the data and n is number of data points to look at
    num_processes = os.cpu_count() + 2
    chunk_size = len(data_frame.index) // num_processes
    print(chunk_size)
    end_chunk = len(data_frame) // chunk_size - 1  # have to subtract 1 cuz indexing by 0 this line took an hour to debug holy shite
    with Pool(processes=num_processes) as pool:
        results = pool.starmap(process_data, [(data_frame, i, chunk_size, n, end_chunk) for i in
                                    range(len(data_frame) // chunk_size)])
    return pd.concat(results)


def process_data(dataframe, i, chunk_size, n, end_chunk):
    fraction = n / chunk_size  # n is number of data points to look at in the smoothing func
    if i == 0:
        start_frame = 0
    else:
        start_frame = chunk_size * i - n
    if i == end_chunk:
        end_frame = len(dataframe)  # Ensure end_frame doesn't exceed length of sampleData
    else:
        end_frame = chunk_size * (i + 1) + n
    new_data_frame = calculate_loess_on_subset(dataframe[start_frame:end_frame], fraction, i, n, end_chunk)

    start_index = chunk_size * i
    if i == end_chunk:
        end_index = len(dataframe)
    else:
        end_index = chunk_size * (i + 1)
    new_data_frame.index = pd.RangeIndex(start_index, end_index)
    return new_data_frame

How can I chop up the data as I want to inside the process_data function prior to sending it to each process? I'm running into scaling issues with more processes and larger dataframes due to the memory overhead.

The reason I'm padding the data is because I'm running a smoothing function and if I dont add the extra 'n' datapoints to each side of the chunk (can't do that for start and end chunks but that's ok) I end up with gaps where the smoothing doesn't match up because the loess doesn't take into account the points prior to the ones in its own chunk.

1

There are 1 answers

1
AceKijani On

I haven't actually implemented this yet. But moving my if elif statements outside of the function process_data should allow me to split it into a list of lists with the sub-lists being chunks and change the for loop in the multiprocessing pool definition to for chunk in chunks. Then all I should need to do is pass in the index of each chunk and rebuild them after.

I'll come back and update this thread with the code if it works ina bit.