Parallel apply in Python

Often in text analytics, we need to process many sentences as part of the text pre-processing (e.g. stemming, stop word removal, punctuation removal) prior to creating the DTM.

Processing the corpus serially would take quite a long time and often we would use SMP to help speed up the process. One of the most useful functions is the apply functions.

In R, this is relatively simple using parapply or dopar. We would just simply use these in place of the familiar apply family of functions. You can learn more about the apply family of functions from the links below:

However with Python, there is no out-of-the-box implementation of parallel apply. And while it is not difficult to create your own parallel apply function, a common limitation is that you cannot pass in a dynamic list of parameters (unlike R). And while there are different solutions available (see links below), they require a lot of wrapper functions and using the ZIP function may not allow you to pass in the partitioned data frame easily.

Note: There’s a fork of multiprocessing called pathos that looks to resolve this issue – however I have not been able to test it.

This short article will show you an example of how you can create your own parallel apply function that allows you to pass in a dynamic list of parameters – thereby making your code more generic and applicable across many different types of functions.

As this parallel apply function in Python will require only the Multiprocessing and Pandas package, it should be easily portable.

I have used code from two separate sites as references to create this tutorial and you can reference them here:

Base parApply Function

We start with the base parApply function as follows:

from multiprocessing import Pool, cpu_count

def parApply(dfGrouped, func):
    with Pool(cpu_count()) as p:
    ret_list = p.map(func, [group for name, group in dfGrouped])
    return pandas.concat(ret_list)

This function uses the Map function to execute “func” based on the partitioned dataframe. Once the execution is done, the partitions are then concatenated back to form the original dataframe.

However, one of the major issues is that “map” here does not allow multiple parameters as inputs to the function.

Create Class wrapper for dynamic parameter list

The following details the code to create a wrapper class to capture the dynamic parameter list


class WithExtraArgs(object):
    def __init__(self, func, **args):
        self.func = func
        self.args = args
    def __call__(self, df):
        return self.func(df, **self.args)

def applyParallel(dfGrouped, func, kwargs):
    with Pool(cpu_count()) as p:
        ret_list = p.map(WithExtraArgs(func, **kwargs), [group for name, group in dfGrouped])
    return pd.concat(ret_list)

The special syntax, *args and **kwargs in function definitions is used to pass a variable number of arguments to a function. The single asterisk form (*args) is used to pass a non-keyworded, variable-length argument list, and the double asterisk form is used to pass a keyworded, variable-length argument list.

Since it is possible that the list of arguments for functions may be unknown, it may be better to use a keyworded, variable-length argument list.

Using the applyParallel function

To use the applyParallel function, you first define the function to be applied (in this example remove_punct )

The remove_punct function strips out all the punctuation characters from a column with text data in the data frame and then creates the new column based on the index and with the new column name. Note that the function has 4 parameters with the first parameter as the partitioned data frame and 3 other parameters (destination col index, destination column name and source column name)


def remove_punct(df, dest_col_ind, dest_col_name, src_col_name):
    df.insert(dest_col_ind, dest_col_name, df.apply(lambda x: re.sub('['+string.punctuation+']', '',x[src_col_name]), axis=1, raw=True))
    return df

#kwargs = {"dest_col_ind": 4, "dest_col_name": "q1nopunct", "src_col_name": "question1"}

applyParallel(df_train.groupby(df_train.grpId), remove_punct, {"dest_col_ind": 4, "dest_col_name": "TargetCol", "src_col_name": "OriginalCol"})

Next you just simply call the applyParallel function with the source data frame, function, and a list of keyworded and variable value pairs.

Please note the following:

  • The keyword in the argument list and the function (i.e remove_punct) parameters have the same name. This allows automatic matching of the keyword to the parameter.
  • The remove_punct function has 4 parameters but only 3 are in the dynamic argument list. This is intentional because the following line in the wrapper class WithExtraArgs has the first parameter already mapped to the data frame
self.func(df, **self.args)

And so with just the above few lines of additional Python code, you are now able to parallelise the apply function for use with your functions without having to always redefine the applyParallel function.

You can download the source code on Github for your easy reference.

Hope this helps in your coding!

Advertisements

One thought on “Parallel apply in Python

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s