pyspark.sql.DataFrame.mapInArrow#
- DataFrame.mapInArrow(func, schema, barrier=False, profile=None)[source]#
Maps an iterator of batches in the current
DataFrame
using a Python native function that is performed on pyarrow.RecordBatchs both as input and output, and returns the result as aDataFrame
.This method applies the specified Python function to an iterator of pyarrow.RecordBatchs, each representing a batch of rows from the original DataFrame. The returned iterator of pyarrow.RecordBatchs are combined as a
DataFrame
. The size of the function’s input and output can be different. Each pyarrow.RecordBatch size can be controlled by spark.sql.execution.arrow.maxRecordsPerBatch.New in version 3.3.0.
- Parameters
- funcfunction
a Python native function that takes an iterator of pyarrow.RecordBatchs, and outputs an iterator of pyarrow.RecordBatchs.
- schema
pyspark.sql.types.DataType
or str the return type of the func in PySpark. The value can be either a
pyspark.sql.types.DataType
object or a DDL-formatted type string.- barrierbool, optional, default False
Use barrier mode execution, ensuring that all Python workers in the stage will be launched concurrently.
- profile
pyspark.resource.ResourceProfile
. The optional ResourceProfile to be used for mapInArrow.
Notes
This API is unstable, and for developers.
Examples
>>> import pyarrow >>> df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age")) >>> def filter_func(iterator): ... for batch in iterator: ... pdf = batch.to_pandas() ... yield pyarrow.RecordBatch.from_pandas(pdf[pdf.id == 1]) >>> df.mapInArrow(filter_func, df.schema).show() +---+---+ | id|age| +---+---+ | 1| 21| +---+---+
Set
barrier
toTrue
to force themapInArrow
stage running in the barrier mode, it ensures all Python workers in the stage will be launched concurrently.>>> df.mapInArrow(filter_func, df.schema, barrier=True).show() +---+---+ | id|age| +---+---+ | 1| 21| +---+---+