PandasCogroupedOps¶
PandasCogroupedOps
is a logical grouping created by GroupedData.cogroup over two GroupedDatas.
from pyspark.sql.pandas.group_ops import PandasCogroupedOps
PandasCogroupedOps
is included in __all__
of pyspark.sql
module (via __init__.py
).
Creating Instance¶
PandasCogroupedOps
takes the following to be created:
PandasCogroupedOps
is created when:
PandasGroupedOpsMixin
is requested to cogroup
applyInPandas¶
applyInPandas(
self,
func: "PandasCogroupedMapFunction", # (1)!
schema: Union[StructType, str]
) -> DataFrame
from pandas.core.frame import DataFrame as PandasDataFrame DataFrameLike = PandasDataFrame PandasCogroupedMapFunction = Union[ # func: (pandas.DataFrame, pandas.DataFrame) -> pandas.DataFrame Callable[[DataFrameLike, DataFrameLike], DataFrameLike], # func: (groupKey(s), pandas.DataFrame, pandas.DataFrame) -> pandas.DataFrame Callable[[Any, DataFrameLike, DataFrameLike], DataFrameLike], ]
applyInPandas
creates a DataFrame with the result of flatMapCoGroupsInPandas with a pandas user defined function of SQL_COGROUPED_MAP_PANDAS_UDF
type.
applyInPandas
creates a pandas user defined function for the given func
and the return type by the given schema
. The pandas UDF is of SQL_COGROUPED_MAP_PANDAS_UDF
type.
applyInPandas
applies the pandas UDF on all the columns of the two GroupedDatas (that creates a Column
expression).
applyInPandas
requests the GroupedData for the associated RelationalGroupedDataset that is in turn requested to flatMapCoGroupsInPandas.
Example¶
df1 = spark.createDataFrame(
data = [
(20000101, 1, 1.0),
(20000101, 2, 2.0),
(20000102, 1, 3.0),
(20000102, 2, 4.0)],
schema = ("time", "id", "v1"))
df2 = spark.createDataFrame(
data = [
(20000101, 1, "x"),
(20000101, 2, "y")],
schema = ("time", "id", "v2"))
import pandas as pd
def asof_join(k, l, r):
if k == (1,):
return pd.merge_asof(l, r, on="time", by="id")
else:
return pd.DataFrame(columns=['time', 'id', 'v1', 'v2'])
gd1 = df1.groupby("id")
gd2 = df2.groupby("id")
gd1
.cogroup(gd2)
.applyInPandas(
asof_join,
"time int, id int, v1 double, v2 string")
.show()