I am looking to parallelise numpy or pandas operations. For this I have been looking into pydata's blaze. My understanding was that seemless parallelisation was its major selling point.
Unfortunately I have been unable to find an operation that runs on more than one core. Is parallel processing in blaze available yet or currently only a stated aim? Am I doing something wrong? I am using blaze v0.6.5.
Example of one function I was hoping to parallelise: (deduplication of a pytables column too large to fit in memory)
import pandas as pd
import blaze as bz
def f1():
counter = 0
groups = pd.DataFrame(columns=['name'])
t = bz.TableSymbol('t', '{name: string}')
e = bz.distinct(t)
for chunk in store.select('my_names', columns=['name'],
chunksize=1e5):
counter += 1
print('processing chunk %d' % counter)
groups = pd.concat([groups, chunk])
groups = bz.compute(e, groups)
Edit 1
I have had problems following Phillip's examples:
In [1]: from blaze import Data, compute
In [2]: d = Data('test.bcolz')
In [3]: d.head(5)
Out[3]: <repr(<blaze.expr.collections.Head at 0x7b5e300>) failed: NotImplementedError: Don't know how to compute:
expr: _1.head(5).head(11)
data: {_1: ctable((8769257,), [('index', '<i8'), ('date', 'S10'), ('accessDate', 'S26')])
nbytes: 367.97 MB; cbytes: 35.65 MB; ratio: 10.32
cparams := cparams(clevel=5, shuffle=True, cname='blosclz')
rootdir := 'test.bcolz'
[(0L, '2014-12-12', '2014-12-14T17:39:19.716000')
(1L, '2014-12-11', '2014-12-14T17:39:19.716000')
(2L, '2014-12-10', '2014-12-14T17:39:19.716000') ...,
(1767L, '2009-11-11', '2014-12-15T13:32:39.906000')
(1768L, '2009-11-10', '2014-12-15T13:32:39.906000')
(1769L, '2009-11-09', '2014-12-15T13:32:39.906000')]}>
My environment:
C:\Anaconda>conda list blaze
# packages in environment at C:\Anaconda:
#
blaze 0.6.8 np19py27_69
But note, blaze seems to report a wrong version:
In [5]: import blaze
In [6]: blaze.__version__
Out[6]: '0.6.7'
With other data sources blaze seems to work:
In [6]: d = Data([1,2,2,2,3,4,4,4,5,6])
In [7]: d.head(5)
Out[7]:
_2
0 1
1 2
2 2
3 2
4 3
In [16]: list(compute(d._2.distinct()))
Out[16]: [1, 2, 3, 4, 5, 6]
Note: The example below requires the latest version of
blaze, which you can get viaYou'll also need the latest version of the nascent
intoproject. You'll need to installintofrommaster, which you can do withYou can't do "seamless" parallelization with an arbitrary backend, but the
bcolzbackend supports parallelization in a nice way. Here's an example with the NYC Taxi trip/fare datasetNote: I've combined both the trip and fare datasets into a single dataset. There are 173,179,759 rows in the dataset
To add process-based parallelism, we bring in the
Poolclass from themultiprocessingstdlib module, and pass thePoolinstance'smapmethod as a keyword argument tocompute:So, roughly a 3x speedup for an extra line of code. Note that this is a string column, and these tend to be very inefficient compared to other types. An
distinctexpression computed over an integer column is finished in about 1 second (vs 3 seconds) with multiple cores (so, about the same improvement in running time):