pyspark.sql.PandasCogroupedOps.applyInPandas

PandasCogroupedOps.applyInPandas(func: PandasCogroupedMapFunction, schema: Union[pyspark.sql.types.StructType, str]) → pyspark.sql.dataframe.DataFrame[source]

Applies a function to each cogroup using pandas and returns the result as a DataFrame.

The function should take two pandas.DataFrames and return another pandas.DataFrame. Alternatively, the user can pass a function that takes a tuple of the grouping key(s) and the two pandas.DataFrames. For each side of the cogroup, all columns are passed together as a pandas.DataFrame to the user-function and the returned pandas.DataFrame are combined as a DataFrame.

The schema should be a StructType describing the schema of the returned pandas.DataFrame. The column labels of the returned pandas.DataFrame must either match the field names in the defined schema if specified as strings, or match the field data types by position if not strings, e.g. integer indices. The length of the returned pandas.DataFrame can be arbitrary.

New in version 3.0.0.

Changed in version 3.4.0: Support Spark Connect.

Parameters
funcfunction

a Python native function that takes two pandas.DataFrames, and outputs a pandas.DataFrame, or that takes one tuple (grouping keys) and two pandas.DataFrames, and outputs a pandas.DataFrame.

schemapyspark.sql.types.DataType or str

the return type of the func in PySpark. The value can be either a pyspark.sql.types.DataType object or a DDL-formatted type string.

Notes

This function requires a full shuffle. All the data of a cogroup will be loaded into memory, so the user should be aware of the potential OOM risk if data is skewed and certain groups are too large to fit in memory.

This API is experimental.

Examples

>>> from pyspark.sql.functions import pandas_udf
>>> df1 = spark.createDataFrame(
...     [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
...     ("time", "id", "v1"))
>>> df2 = spark.createDataFrame(
...     [(20000101, 1, "x"), (20000101, 2, "y")],
...     ("time", "id", "v2"))
>>> def asof_join(l, r):
...     return pd.merge_asof(l, r, on="time", by="id")
...
>>> df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
...     asof_join, schema="time int, id int, v1 double, v2 string"
... ).show()  
+--------+---+---+---+
|    time| id| v1| v2|
+--------+---+---+---+
|20000101|  1|1.0|  x|
|20000102|  1|3.0|  x|
|20000101|  2|2.0|  y|
|20000102|  2|4.0|  y|
+--------+---+---+---+

Alternatively, the user can define a function that takes three arguments. In this case, the grouping key(s) will be passed as the first argument and the data will be passed as the second and third arguments. The grouping key(s) will be passed as a tuple of numpy data types, e.g., numpy.int32 and numpy.float64. The data will still be passed in as two pandas.DataFrame containing all columns from the original Spark DataFrames.

>>> def asof_join(k, l, r):
...     if k == (1,):
...         return pd.merge_asof(l, r, on="time", by="id")
...     else:
...         return pd.DataFrame(columns=['time', 'id', 'v1', 'v2'])
...
>>> df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
...     asof_join, "time int, id int, v1 double, v2 string").show()  
+--------+---+---+---+
|    time| id| v1| v2|
+--------+---+---+---+
|20000101|  1|1.0|  x|
|20000102|  1|3.0|  x|
+--------+---+---+---+