Introducing Pandas UDFs for PySpark

By Two Sigma on November 21, 2017
Picture of Lego Figures

Two Sigma researcher Li Jin introduces the Pandas UDFs feature in the upcoming Apache Spark 2.3 release.

Note: This post was updated on March 2, 2018.

This article—a version of which originally appeared on the Databricks blog—introduces the Pandas UDFs (formerly Vectorized UDFs) feature in the upcoming Apache Spark 2.3 release, which substantially improves the performance and usability of user-defined functions (UDFs) in Python.

Over the past few years, Python has become the default language for data scientists. Packages such as pandasnumpystatsmodel, and scikit-learn have gained great adoption and become the mainstream toolkits. At the same time, Apache Spark has become the de facto standard in processing big data. To enable data scientists to leverage the value of big data, Spark added a Python API in version 0.7, with support for user-defined functions. These user-defined functions operate one-row-at-a-time, and thus suffer from high serialization and invocation overhead. As a result, many data pipelines define UDFs in Java and Scala, and then invoke them from Python.

Pandas UDFs built on top of Apache Arrow bring you the best of both worlds—the ability to define low-overhead, high-performance UDFs entirely in Python.

In Spark 2.3, there will be two kinds of Pandas UDFs: scalar and grouped map. Next, we illustrate their usage using four example programs: Plus One, Cumulative Probability, Subtract Mean, Ordinary Least Squares Linear Regression.

Scalar Pandas UDFs

Scalar Pandas UDFs are used for vectorizing scalar operations. To define a scalar Pandas UDF, simply use @pandas_udf to annotate a Python function that takes in pandas.Series as arguments and returns another pandas.Series of the same size. Below we illustrate using two examples: Plus One and Cumulative Probability.

Plus One

Computing v + 1 is a simple example for demonstrating differences between row-at-a-time UDFs and scalar Pandas UDFs. Note that built-in column operators can perform much faster in this scenario.

Using row-at-a-time UDFs:

Using Pandas UDFs:

The examples above define a row-at-a-time UDF “plus_one” and a scalar Pandas UDF “pandas_plus_one” that performs the same “plus one” computation. The UDF definitions are the same except the function decorators: “udf” vs “pandas_udf”.

In the row-at-a-time version, the user-defined function takes a double “v” and returns the result of “v + 1” as a double. In the Pandas version, the user-defined function takes a pandas.Series “v” and returns the result of “v + 1” as a pandas.Series. Because “v + 1” is vectorized on pandas.Series, the Pandas version is much faster than the row-at-a-time version.

Note that there are two important requirements when using scalar Pandas UDFs:

  • The input and output series must have the same size.
  • How a column is split into multiple pandas.Series is internal to Spark, and therefore the result of user-defined function must be independent of the splitting.

Cumulative Probability

This example shows a more practical use of the Pandas UDF: computing the cumulative probability of a value in a normal distribution N(0,1) using scipy package.

stats.norm.cdf works both on a scalar value and pandas.Series, and this example can be written with the row-at-a-time UDFs as well. Similar to the previous example, the Pandas version runs much faster, as shown later in the “Performance Comparison” section.

Grouped Map Pandas UDFs

Python users are fairly familiar with the split-apply-combine pattern in data analysis. The grouped map Pandas UDFs are designed for this scenario, and they operate on all the data for some group, e.g., “for each date, apply this operation”.

Grouped map Pandas UDFs first splits a Spark DataFrame into groups based on the conditions specified in the groupby operator, applies a user-defined function (pandas.DataFrame -> pandas.DataFrame) to each group, combines and returns the results as a new Spark DataFrame.

Grouped map Pandas UDFs uses the same function decorator pandas_udf as scalar Pandas UDFs, but they have a few differences:

  • Input of the user-defined function:
    • Scalar: pandas.Series
    • Grouped map: pandas.DataFrame
  • Output of the user-defined function:
    • Scalar: pandas.Series
    • Grouped map: pandas.DataFrame
  • Grouping semantics:
    • Scalar: no grouping semantics
    • Grouped map: defined by “groupby” clause
  • Output size:
    • Scalar: same as input size
    • Grouped map: any size
  • Return types in the function decorator:
    • Scalar: a DataType that specifies the type of the returned pandas.Series
    • Grouped map: a StructType that specifies each column name and type of the returned pandas.DataFrame

Next, let us walk through two examples to illustrate the use cases of grouped map Pandas UDFs.

Subtract Mean

This example shows a simple use of grouped map Pandas UDFs: subtracting mean from each value in the group.

In this example, we subtract mean of v from each value of v for each group. The grouping semantics is defined by the “groupby” function, i.e, each input pandas.DataFrame to the user-defined function has the same “id” value. The input and output schema of this user-defined function are the same, so we pass “df.schema” to the decorator pandas_udf for specifying the schema.

Grouped map Pandas UDFs can also be called as standalone Python functions on the driver. This is very useful for debugging, for example:

In the example above, we first convert a small subset of Spark DataFrame to a pandas.DataFrame, and then run subtract_mean as a standalone Python function on it. After verifying the function logics, we can call the UDF with Spark over the entire dataset.

Ordinary Least Squares Linear Regression

The last example shows how to run OLS linear regression for each group using statsmodels. For each group, we calculate beta b = (b1, b2) for X = (x1, x2) according to statistical model Y = bX + c.

This example demonstrates that grouped map Pandas UDFs can be used with any arbitrary python function: pandas.DataFrame -> pandas.DataFrame. The returned pandas.DataFrame can have different number rows and columns as the input.

Performance Comparison

Lastly, we want to show performance comparison between row-at-a-time UDFs and Pandas UDFs. We ran micro benchmarks for three of the above examples (plus one, cumulative probability and subtract mean).

Configuration and Methodology

We ran the benchmark on a single node Spark cluster on Databricks community edition.

Configuration details:
Data: A 10M-row DataFrame with a Int column and a Double column
Cluster: 6.0 GB Memory, 0.88 Cores, 1 DBU
Databricks runtime version: Latest RC (4.0, Scala 2.11)

For the detailed implementation of the benchmark, check the Pandas UDF Notebook.

Performance Comparision

As shown in the charts, Pandas UDFs perform much better than row-at-a-time UDFs across the board, ranging from 3x to over 100x.

Conclusion and Future Work

The upcoming Spark 2.3 release lays down the foundation for substantially improving the capabilities and performance of user-defined functions in Python. In the future, we plan to introduce support for Pandas UDFs in aggregations and window functions. The related work can be tracked in SPARK-22216.

You can try the Pandas UDF notebook and this feature is now available as part of Databricks Runtime 4.0 beta.

Pandas UDFs is a collaborative effort by many people in the Apache Spark and Apache Arrow community, including Bryan Cutler, Hyukjin Kwon, Jeff Reback, Liang-Chi Hsieh, Leif Walsh, Li Jin, Reynold Xin, Takuya Ueshin, Wenchen Fan, Wes McKinney, Xiao Li, and many others.

This article is not an endorsement by Two Sigma of the papers discussed, their viewpoints or the companies discussed. The views expressed above reflect those of the authors and are not necessarily the views of Two Sigma Investments, LP or any of its affiliates (collectively, “Two Sigma”).  The information presented above is only for informational and educational purposes and is not an offer to sell or the solicitation of an offer to buy any securities or other instruments. Additionally, the above information is not intended to provide, and should not be relied upon for investment, accounting, legal or tax advice. Two Sigma makes no representations, express or implied, regarding the accuracy or completeness of this information, and the reader accepts all risks in relying on the above information for any purpose whatsoever. Click here for other important disclaimers and disclosures.