Parallel computation with two lines of code

Posted on Fri 19 May 2017 • Tagged with Python, Code Snippets

It's a naive advice for real beginners, however I'm sure I will copypaste snippets from here over and over again.

Let's imagine, we need to take some shared data (e.g. dataframe) and do a lot of similar computations. Looks like a nice candidate for parallel computation, right?

import numpy as np
import pandas as pd 

df = pd.DataFrame({'a': np.random.rand(1000), 'b': np.random.rand(1000)})

def process(df, k):
    """Some stupid ineffecient function"""
    i = 0
    for _, row in df.iterrows():
        if row.get('a') > row.get('b') * k:
            i += 1
   return i

result = [process(df, k) for k in np.random.rand(1000)]

Of course, most straight-forward way to get the result is for loop (or list comprehension, which are pretty similar). Is it fast? Not sure...

In [28]: %timeit [process(df, k) for k in np.random.rand(1000)]
1 loop, best of 3: 55.8 s per loop

Easiest way to parallelize is using multiprocessing.dummy.Pool for thread-based parallelizing and multiprocessing.Pool for process-based parallelizing. Let's start with threads:

In [29]: from multiprocessing.dummy import Pool

In [30]: %timeit Pool(4).map(lambda k: process(df, k),  np.arange(0, 1000))
1 loop, best of 3: 1min per loop

No speedup in this synthetic example: more time wasted to thread switching, and efficiency sucks as the data frame is locked by GIL.
OK, trying processes:

In [31]: from multiprocessing import Pool

In [32]: Pool(4).map(lambda k: process(df, k),  np.arange(0, 1000))
---------------------------------------------------------------------------
PicklingError                             Traceback (most recent call last)
<ipython-input-30-9132ed5c6c3f> in <module>()
----> 1 Pool(4).map(lambda k: process(df, k),  np.arange(0, 1000))

/Users/Arseny/.pyenv/versions/3.6.0/lib/python3.6/multiprocessing/pool.py in map(self, func, iterable, chunksize)
    258         in a list that is returned.
    259         '''
--> 260         return self._map_async(func, iterable, mapstar, chunksize).get()
    261
    262     def starmap(self, func, iterable, chunksize=None):

/Users/Arseny/.pyenv/versions/3.6.0/lib/python3.6/multiprocessing/pool.py in get(self, timeout)
    606             return self._value
    607         else:
--> 608             raise self._value
    609
    610     def _set(self, i, obj):

/Users/Arseny/.pyenv/versions/3.6.0/lib/python3.6/multiprocessing/pool.py in _handle_tasks(taskqueue, put, outqueue, pool, cache)
    383                         break
    384                     try:
--> 385                         put(task)
    386                     except Exception as e:
    387                         job, ind = task[:2]

/Users/Arseny/.pyenv/versions/3.6.0/lib/python3.6/multiprocessing/connection.py in send(self, obj)
    204         self._check_closed()
    205         self._check_writable()
--> 206         self._send_bytes(_ForkingPickler.dumps(obj))
    207
    208     def recv_bytes(self, maxlength=None):

/Users/Arseny/.pyenv/versions/3.6.0/lib/python3.6/multiprocessing/reduction.py in dumps(cls, obj, protocol)
     49     def dumps(cls, obj, protocol=None):
     50         buf = io.BytesIO()
---> 51         cls(buf, protocol).dump(obj)
     52         return buf.getbuffer()
     53

PicklingError: Can't pickle <function <lambda> at 0x112839ea0>: attribute lookup <lambda> on __main__ failed

Fail. BTW, that's not the only limitation related to pickle (e.g. you can't pickle local objects).

Should we become sad at the moment? Nope, we have brilliant joblib - btw it's a library that serves most parallel computations in your favorite scikit-learn.

In [33]: from joblib import Parallel, delayed

In [34]: %timeit Parallel(n_jobs=4)(delayed(process)(df, k) for k in np.arange(0, 1000))
1 loop, best of 3: 34 s per loop

Still two lines, and the last one may look a bit uncommon. But who cares, if one can load all CPUs on their dev server and receive the results faster?