Python Pandas Functions in Parallel

I’m always on the lookout for quick hacks and code snippets that might help improve efficiency. Most of the time that’s through stackoverflow but here’s one that deals with parallelization and efficiency that I thought would be helpful.

Since Pandas doesn’t have an internal parallelism feature yet, it makes doing apply functions with huge datasets a pain if the functions have expensive computation times. One way to shorten that amount of  time is to split the dataset into separate pieces, perform the apply function, and then re-concatenate the pandas dataframes.

Let’s take an example pandas dataframe.

import pandas as pd
import numpy as np
import seaborn as sns
from multiprocessing import Pool

num_partitions = 10 #number of partitions to split dataframe
num_cores = 4 #number of cores on your machine

iris = pd.DataFrame(sns.load_dataset('iris'))

I’m going to use the multiprocessing package in Python and import Pool. Pool helps spin up new threads on the machine.

def parallelize_dataframe(df, func):
    df_split = np.array_split(df, num_partitions)
    pool = Pool(num_cores)
    df = pd.concat(pool.map(func, df_split))
    pool.close()
    pool.join()
    return df

df and func are the dataframe and function being applied to the dataframe respectively. Split the dataframe into the set of partitions. Note that if you specify a number greater than the number of rows in the dataset then the function will throw an error.

Instantiate a Pool incident with the number of cores on your machine. Then the pool.map function essentially applies func  to the list of partitioned dataframes by iterating through the given list. pd.concat  just re-concatenates all of the partitioned dataframes into one again.

Example:

def multiply_columns(data):
    data['length_of_word'] = data['species'].apply(lambda x: len(x))
    return data
    
iris = parallelize_dataframe(iris, multiply_columns)

If you use AWS and maximize the number of your cores, it can vastly improve the speed of expensive functions on pandas. I’m not too sure how memory effective concatenating the dataframes. More on the map function is here.

Let me know if it works for you

7 comments

    1. No. The default python implementation (Cpython) has a global interpreter lock (for reasons mysterious to me) which prevents you from executing multiple threads.

  1. Possible that there’s a lot more going on.

    I’ve got the default Numpy anaconda installation with OpenBLAS and it shows that all the cores are used to perform some matrix computation (SVD in my case). When I tried to run SVD a list of random matrices in parallel, the result was actually slower than if I had done it in parallel. Some googling matched my intuition – a lot of the base numerical routines optimize to run in parallel such that they utilize resources much more efficiently if you do them serially than if you decide to run them in parallel python processes.

    Maybe this works for more straightforward operations (as is common in pandas).

  2. Thanks for the concise example of how to parallelize a function. However, this is also a good example of a case where parallelizing is not nearly as helpful as using the built-in vectorized Pandas function.

    I clocked your code `iris = parallelize_dataframe(iris, multiply_columns)` (which calls `apply(lambda x: len(x))`) vs `iris[‘length_of_word’] = iris[‘species’].str.len()` and the vectorized str.len() function was much faster.

    I wrote a post on exploring this at: https://maxpowerwastaken.github.io/blog/pandas-dont-apply-_-vectorize/

  3. Did you get a chance to test the above for me I am seeing that the results are a lot worse with parallelize_dataframe

    In [14]: import pandas as pd
    …: import numpy as np
    …: import seaborn as sns
    …: from multiprocessing import Pool
    …:
    …: num_partitions = 10 #number of partitions to split dataframe
    …: num_cores = 4 #number of cores on your machine
    …:
    …: iris = pd.DataFrame(sns.load_dataset(‘iris’))
    …: def parallelize_dataframe(df, func):
    …: df_split = np.array_split(df, num_partitions)
    …: pool = Pool(num_cores)
    …: df = pd.concat(pool.map(func, df_split))
    …: pool.close()
    …: pool.join()
    …: return df
    …: def multiply_columns(data):
    …: data[‘length_of_word’] = data[‘species’].apply(lambda x: len(x))
    …: return data
    …:
    …: with timeit():
    …: iris = parallelize_dataframe(iris, multiply_columns)
    …:
    …: with timeit():
    …: iris[‘length_of_word’] = iris[‘species’].apply(lambda x: len(x))
    …:
    …:
    runtime: 0:00:00.116576
    runtime: 0:00:00.001571

Leave a Reply

Your email address will not be published. Required fields are marked *