Skip to content

PandasCogroupedOps

PandasCogroupedOps is a logical grouping created by GroupedData.cogroup over two GroupedDatas.

from pyspark.sql.pandas.group_ops import PandasCogroupedOps

PandasCogroupedOps is included in __all__ of pyspark.sql module (via __init__.py).

Creating Instance

PandasCogroupedOps takes the following to be created:

PandasCogroupedOps is created when:

  • PandasGroupedOpsMixin is requested to cogroup

applyInPandas

applyInPandas(
  self,
  func: "PandasCogroupedMapFunction", # (1)!
  schema: Union[StructType, str]
) -> DataFrame
  1. from pandas.core.frame import DataFrame as PandasDataFrame
    DataFrameLike = PandasDataFrame
    PandasCogroupedMapFunction = Union[
      # func: (pandas.DataFrame, pandas.DataFrame) -> pandas.DataFrame
      Callable[[DataFrameLike, DataFrameLike], DataFrameLike],
      # func: (groupKey(s), pandas.DataFrame, pandas.DataFrame) -> pandas.DataFrame
      Callable[[Any, DataFrameLike, DataFrameLike], DataFrameLike],
    ]
    

applyInPandas creates a DataFrame with the result of flatMapCoGroupsInPandas with a pandas user defined function of SQL_COGROUPED_MAP_PANDAS_UDF type.


applyInPandas creates a pandas user defined function for the given func and the return type by the given schema. The pandas UDF is of SQL_COGROUPED_MAP_PANDAS_UDF type.

applyInPandas applies the pandas UDF on all the columns of the two GroupedDatas (that creates a Column expression).

applyInPandas requests the GroupedData for the associated RelationalGroupedDataset that is in turn requested to flatMapCoGroupsInPandas.

Example

df1 = spark.createDataFrame(
    data = [
      (20000101, 1, 1.0),
      (20000101, 2, 2.0),
      (20000102, 1, 3.0),
      (20000102, 2, 4.0)],
    schema = ("time", "id", "v1"))
df2 = spark.createDataFrame(
    data = [
      (20000101, 1, "x"),
      (20000101, 2, "y")],
    schema = ("time", "id", "v2"))
import pandas as pd
def asof_join(k, l, r):
  if k == (1,):
    return pd.merge_asof(l, r, on="time", by="id")
  else:
    return pd.DataFrame(columns=['time', 'id', 'v1', 'v2'])
gd1 = df1.groupby("id")
gd2 = df2.groupby("id")
gd1
  .cogroup(gd2)
  .applyInPandas(
    asof_join,
    "time int, id int, v1 double, v2 string")
  .show()