Read multiple csv files in a pandas dataframe, in parallel

65 views Asked by At

This question develops on the preceding one:

Read multiple csv files in a pandas dataframe

Basically, I have a set of files like:

file 1:

<empty line>
#-----------------------------------------
#    foo  bar  baz  
#-----------------------------------------
    0.0120932       1.10166       1.08745 
    0.0127890       1.10105       1.08773 
    0.0142051       1.09941       1.08760 
    0.0162801       1.09662       1.08548 
    0.0197376       1.09170       1.08015 

file 2:

<empty line>
#-----------------------------------------
#    foo  bar  baz  
#-----------------------------------------
    0.888085      0.768590      0.747961
    0.893782      0.781607      0.760417
    0.899830      0.797021      0.771219
    0.899266      0.799260      0.765859
    0.891489      0.781255      0.728892

etc. Each file is identified by an ID, and there's a ID to file mapping:

files = {'A': 'A.csv', 'B': 'B.csv'}

Thanks to the other answer, I can read the files serially:

columns = ['foo', 'bar', 'baz']
skip = 4
df = (pd.concat({k: pd.read_csv(v, skiprows=skip, sep=r'\s+', names=names)
                 for k,v in files.items()},
                names=['ID'])
        .reset_index('ID')
        .reset_index(drop=True)
    )

However, I would like to read them in parallel, to take advantage of my multicore machine. A naive attempt doesn't work:

from joblib import Parallel, delayed
from multiprocessing import cpu_count

n_jobs = cpu_count()


def read_file(res_dict: dict,
                          skiprows: int,
                          columns: list[str],
                          id: str,
                          file: Path
                          ) -> None:
    res_dict[id] = pd.read_csv(file, skiprows=skiprows, sep=r'\s+', names=columns)


temp = {}
temp = Parallel(n_jobs)(delayed(read_file)(temp, skip_rows, columns, id, file) for id, file in master2file.items())
df = (pd.concat(temp,
                names=['ID'])
      .reset_index('ID')
      .reset_index(drop=True)
      )

I get the error

Traceback (most recent call last):
  File "/home/...py", line 54, in <module>
    df = (pd.concat(temp,
  File "/home/../.venv/lib/python3.10/site-packages/pandas/core/reshape/concat.py", line 372, in concat
    op = _Concatenator(
  File "/home/../.venv/lib/python3.10/site-packages/pandas/core/reshape/concat.py", line 452, in __init__
    raise ValueError("All objects passed were None")
ValueError: All objects passed were None

Process finished with exit code 1

What am I doing wrong? Can you help me?

2

There are 2 answers

2
mozway On BEST ANSWER

Here is a functional version of your code. I didn't use a dictionary but returned the dataframes directly (since you already have the IDs as keys of master2file). I passed the keys as parameter to concat instead of using a dictionary.

from joblib import Parallel, delayed
from multiprocessing import cpu_count

n_jobs = cpu_count()

master2file = {'A': 'A.csv', 'B': 'B.csv'}
skip_rows = 4
columns = ['foo', 'bar', 'baz']

def read_file(skiprows: int,
              columns: list[str],
              id: str,
              file: Path
              ) -> None:
    return pd.read_csv(file, skiprows=skiprows, sep=r'\s+', names=columns)


temp = Parallel(n_jobs)(delayed(read_file)(skip_rows, columns, id, file)
                                           for id, file in master2file.items())
df = (pd.concat(temp,
                keys=list(master2file),
                names=['ID'])
      .reset_index('ID')
      .reset_index(drop=True)
      )

Output:

  ID       foo       bar       baz
0  A  0.012093  1.101660  1.087450
1  A  0.012789  1.101050  1.087730
2  A  0.014205  1.099410  1.087600
3  A  0.016280  1.096620  1.085480
4  A  0.019738  1.091700  1.080150
5  B  0.888085  0.768590  0.747961
6  B  0.893782  0.781607  0.760417
7  B  0.899830  0.797021  0.771219
8  B  0.899266  0.799260  0.765859
9  B  0.891489  0.781255  0.728892
0
DeltaIV On

Alternative, most likely worse, attempt:

from joblib import Parallel, delayed
from multiprocessing import cpu_count

n_jobs = cpu_count()

files = {'A': 'A.csv', 'B': 'B.csv'}
skip_rows = 4
columns = ['foo', 'bar', 'baz']

def read_file(skiprows: int,
              columns: list[str],
              id: str,
              file: Path
              ) -> None:
    df = pd.read_csv(file, skiprows=skiprows, sep=r'\s+', names=columns)
    df.insert(0, 'ID', id)
    return 


temp = Parallel(n_jobs)(delayed(read_file)(skip_rows, columns, id, file)
                                           for id, file in files.items())
df = pd.concat(temp).reset_index(drop=True)