Skip to content

pandas User-Defined Functions

pandas User-Defined Functions (Vectorized User-Defined Functions or pandas UDFs) are user-defined functions that are executed using Apache Arrow to transfer data and pandas to work with the data, which allows for vectorized operations.

Pandas UDFs are defined using @pandas_udf decorator.

A Pandas UDF behaves as a regular PySpark function API in general.

As of Spark 3.0.0 (SPARK-28264), using Python type hints in pandas UDF is encouraged (instead of specifying pandas UDF type via functionType argument).

The return type (type hint) of a user-defined function should be as follows:

  • pandas.Series (pandas) in most cases
  • pandas.DataFrame (pandas) for struct input or output

@pandas_udf Decorator

pandas_udf(
  f=None,
  returnType=None,
  functionType=None)

pandas_udf function is used a decorator (using @pandas_udf annotation).

Python Decorators

Learn more in PEP 318 – Decorators for Functions and Methods.

pandas_udf belongs to pyspark.sql.functions module.

from pyspark.sql.functions import pandas_udf

functionType

functionType can be one of PandasUDFTypes (but is currently discouraged in favour of type hints).

@pandas_udf(returnType = "long", functionType = PandasUDFType.GROUPED_AGG)
def my_udaf(names: pd.Series) -> 'long':
  return pd.Series(names.count())

functionType is also known as evalType.

SQL_SCALAR_PANDAS_UDF is the default scalar UDF type.

returnType

@pandas_udf decorator can optionally specify a return type (as the first positional argument or using returnType).

A return type can be one of the names of pyspark.sql.types.DataType instances or the DataType themselves.

@pandas_udf(dataType)
@pandas_udf(returnType=dataType)

pandas UDAFs

pandas User-Defined Aggregate Functions.

Demo

import pandas as pd
from pyspark.sql.functions import pandas_udf
@pandas_udf("string")
def to_upper(s: pd.Series) -> pd.Series:
    return s.str.upper()
@pandas_udf("string")
def my_concat(names: pd.Series, ages: pd.Series) -> pd.Series:
    return pd.Series([f"{n} is {a} years old" for (n, a) in zip(names, ages)])
pandas_df = pd.DataFrame({
  'name': ['jacek', 'agata', 'iweta', 'patryk', 'maksym'],
  'age': [50, 49, 29, 26, 11]
  })
df = spark.createDataFrame(pandas_df)
>>> df.show()
+------+---+
|  name|age|
+------+---+
| jacek| 50|
| agata| 49|
| iweta| 29|
|patryk| 26|
|maksym| 11|
+------+---+
>>> df.printSchema()
root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
(df
  .select(to_upper(df.name).alias("upper_name"))
  .show())
+----------+
|upper_name|
+----------+
|     JACEK|
|     AGATA|
|     IWETA|
|    PATRYK|
|    MAKSYM|
+----------+
df.select(my_concat(df.name, df.age)).show(truncate = False)
+----------------------+
|my_concat(name, age)  |
+----------------------+
|jacek is 50 years old |
|agata is 49 years old |
|iweta is 29 years old |
|patryk is 26 years old|
|maksym is 11 years old|
+----------------------+