#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import annotations
import inspect
import numpy as np
import pandas as pd
import uuid
from pyspark import SparkContext
from pyspark.sql.functions import pandas_udf
from pyspark.sql.column import Column, _to_java_column
from pyspark.sql.types import (
ArrayType,
ByteType,
DataType,
DoubleType,
FloatType,
IntegerType,
LongType,
ShortType,
StringType,
StructType,
)
from pyspark.ml.util import try_remote_functions
from typing import Any, Callable, Iterator, List, Mapping, TYPE_CHECKING, Tuple, Union, Optional
if TYPE_CHECKING:
from pyspark.sql._typing import UserDefinedFunctionLike
supported_scalar_types = (
ByteType,
ShortType,
IntegerType,
LongType,
FloatType,
DoubleType,
StringType,
)
# Callable type for end user predict functions that take a variable number of ndarrays as
# input and returns one of the following as output:
# - single ndarray (single output)
# - dictionary of named ndarrays (multiple outputs represented in columnar form)
# - list of dictionaries of named ndarrays (multiple outputs represented in row form)
PredictBatchFunction = Callable[
[np.ndarray], Union[np.ndarray, Mapping[str, np.ndarray], List[Mapping[str, np.dtype]]]
]
[docs]@try_remote_functions
def vector_to_array(col: Column, dtype: str = "float64") -> Column:
"""
Converts a column of MLlib sparse/dense vectors into a column of dense arrays.
.. versionadded:: 3.0.0
.. versionchanged:: 3.5.0
Supports Spark Connect.
Parameters
----------
col : :py:class:`pyspark.sql.Column` or str
Input column
dtype : str, optional
The data type of the output array. Valid values: "float64" or "float32".
Returns
-------
:py:class:`pyspark.sql.Column`
The converted column of dense arrays.
Examples
--------
>>> from pyspark.ml.linalg import Vectors
>>> from pyspark.ml.functions import vector_to_array
>>> from pyspark.mllib.linalg import Vectors as OldVectors
>>> df = spark.createDataFrame([
... (Vectors.dense(1.0, 2.0, 3.0), OldVectors.dense(10.0, 20.0, 30.0)),
... (Vectors.sparse(3, [(0, 2.0), (2, 3.0)]),
... OldVectors.sparse(3, [(0, 20.0), (2, 30.0)]))],
... ["vec", "oldVec"])
>>> df1 = df.select(vector_to_array("vec").alias("vec"),
... vector_to_array("oldVec").alias("oldVec"))
>>> df1.collect()
[Row(vec=[1.0, 2.0, 3.0], oldVec=[10.0, 20.0, 30.0]),
Row(vec=[2.0, 0.0, 3.0], oldVec=[20.0, 0.0, 30.0])]
>>> df2 = df.select(vector_to_array("vec", "float32").alias("vec"),
... vector_to_array("oldVec", "float32").alias("oldVec"))
>>> df2.collect()
[Row(vec=[1.0, 2.0, 3.0], oldVec=[10.0, 20.0, 30.0]),
Row(vec=[2.0, 0.0, 3.0], oldVec=[20.0, 0.0, 30.0])]
>>> df1.schema.fields
[StructField('vec', ArrayType(DoubleType(), False), False),
StructField('oldVec', ArrayType(DoubleType(), False), False)]
>>> df2.schema.fields
[StructField('vec', ArrayType(FloatType(), False), False),
StructField('oldVec', ArrayType(FloatType(), False), False)]
"""
sc = SparkContext._active_spark_context
assert sc is not None and sc._jvm is not None
return Column(
sc._jvm.org.apache.spark.ml.functions.vector_to_array(_to_java_column(col), dtype)
)
[docs]@try_remote_functions
def array_to_vector(col: Column) -> Column:
"""
Converts a column of array of numeric type into a column of pyspark.ml.linalg.DenseVector
instances
.. versionadded:: 3.1.0
.. versionchanged:: 3.5.0
Supports Spark Connect.
Parameters
----------
col : :py:class:`pyspark.sql.Column` or str
Input column
Returns
-------
:py:class:`pyspark.sql.Column`
The converted column of dense vectors.
Examples
--------
>>> from pyspark.ml.functions import array_to_vector
>>> df1 = spark.createDataFrame([([1.5, 2.5],),], schema='v1 array<double>')
>>> df1.select(array_to_vector('v1').alias('vec1')).collect()
[Row(vec1=DenseVector([1.5, 2.5]))]
>>> df2 = spark.createDataFrame([([1.5, 3.5],),], schema='v1 array<float>')
>>> df2.select(array_to_vector('v1').alias('vec1')).collect()
[Row(vec1=DenseVector([1.5, 3.5]))]
>>> df3 = spark.createDataFrame([([1, 3],),], schema='v1 array<int>')
>>> df3.select(array_to_vector('v1').alias('vec1')).collect()
[Row(vec1=DenseVector([1.0, 3.0]))]
"""
sc = SparkContext._active_spark_context
assert sc is not None and sc._jvm is not None
return Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
def _batched(
data: Union[pd.Series, pd.DataFrame, Tuple[pd.Series]], batch_size: int
) -> Iterator[pd.DataFrame]:
"""Generator that splits a pandas dataframe/series into batches."""
if isinstance(data, pd.DataFrame):
df = data
elif isinstance(data, pd.Series):
df = pd.concat((data,), axis=1)
else: # isinstance(data, Tuple[pd.Series]):
df = pd.concat(data, axis=1)
index = 0
data_size = len(df)
while index < data_size:
yield df.iloc[index : index + batch_size]
index += batch_size
def _is_tensor_col(data: Union[pd.Series, pd.DataFrame]) -> bool:
if isinstance(data, pd.Series):
return data.dtype == np.object_ and isinstance(data.iloc[0], (np.ndarray, list))
elif isinstance(data, pd.DataFrame):
return any(data.dtypes == np.object_) and any(
[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]]
)
else:
raise ValueError(
"Unexpected data type: {}, expected pd.Series or pd.DataFrame.".format(type(data))
)
def _has_tensor_cols(data: Union[pd.Series, pd.DataFrame, Tuple[pd.Series]]) -> bool:
"""Check if input Series/DataFrame/Tuple contains any tensor-valued columns."""
if isinstance(data, (pd.Series, pd.DataFrame)):
return _is_tensor_col(data)
else: # isinstance(data, Tuple):
return any(_is_tensor_col(elem) for elem in data)
def _validate_and_transform_multiple_inputs(
batch: pd.DataFrame, input_shapes: List[Optional[List[int]]], num_input_cols: int
) -> List[np.ndarray]:
multi_inputs = [batch[col].to_numpy() for col in batch.columns]
if input_shapes:
if len(input_shapes) == num_input_cols:
multi_inputs = [
np.vstack(v).reshape([-1] + input_shapes[i]) # type: ignore
if input_shapes[i]
else v
for i, v in enumerate(multi_inputs)
]
if not all([len(x) == len(batch) for x in multi_inputs]):
raise ValueError("Input data does not match expected shape.")
else:
raise ValueError("input_tensor_shapes must match columns")
return multi_inputs
def _validate_and_transform_single_input(
batch: pd.DataFrame,
input_shapes: List[List[int] | None],
has_tensors: bool,
has_tuple: bool,
) -> np.ndarray:
# multiple input columns for single expected input
if has_tensors:
# tensor columns
if len(batch.columns) == 1:
# one tensor column and one expected input, vstack rows
single_input = np.vstack(batch.iloc[:, 0])
else:
raise ValueError(
"Multiple input columns found, but model expected a single "
"input, use `array` to combine columns into tensors."
)
else:
# scalar columns
if len(batch.columns) == 1:
# single scalar column, remove extra dim
np_batch = batch.to_numpy()
single_input = np.squeeze(np_batch, -1) if len(np_batch.shape) > 1 else np_batch
if input_shapes and input_shapes[0] not in [None, [], [1]]:
raise ValueError("Invalid input_tensor_shape for scalar column.")
elif not has_tuple:
# columns grouped via `array`, convert to single tensor
single_input = batch.to_numpy()
if input_shapes and input_shapes[0] != [len(batch.columns)]:
raise ValueError("Input data does not match expected shape.")
else:
raise ValueError(
"Multiple input columns found, but model expected a single "
"input, use `array` to combine columns into tensors."
)
# if input_tensor_shapes provided, try to reshape input
if input_shapes:
if len(input_shapes) == 1:
single_input = single_input.reshape([-1] + input_shapes[0]) # type: ignore
if len(single_input) != len(batch):
raise ValueError("Input data does not match expected shape.")
else:
raise ValueError("Multiple input_tensor_shapes found, but model expected one input")
return single_input
def _validate_and_transform_prediction_result(
preds: np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, Any]],
num_input_rows: int,
return_type: DataType,
) -> pd.DataFrame | pd.Series:
"""Validate numpy-based model predictions against the expected pandas_udf return_type and
transforms the predictions into an equivalent pandas DataFrame or Series."""
if isinstance(return_type, StructType):
struct_rtype: StructType = return_type
fieldNames = struct_rtype.names
if isinstance(preds, dict):
# dictionary of columns
predNames = list(preds.keys())
for field in struct_rtype.fields:
if isinstance(field.dataType, ArrayType):
if len(preds[field.name].shape) == 2:
preds[field.name] = list(preds[field.name])
else:
raise ValueError(
"Prediction results for ArrayType must be two-dimensional."
)
elif isinstance(field.dataType, supported_scalar_types):
if len(preds[field.name].shape) != 1:
raise ValueError(
"Prediction results for scalar types must be one-dimensional."
)
else:
raise ValueError("Unsupported field type in return struct type.")
if len(preds[field.name]) != num_input_rows:
raise ValueError("Prediction results must have same length as input data")
elif isinstance(preds, list) and isinstance(preds[0], dict):
# rows of dictionaries
predNames = list(preds[0].keys())
if len(preds) != num_input_rows:
raise ValueError("Prediction results must have same length as input data.")
for field in struct_rtype.fields:
if isinstance(field.dataType, ArrayType):
if len(preds[0][field.name].shape) != 1:
raise ValueError(
"Prediction results for ArrayType must be one-dimensional."
)
elif isinstance(field.dataType, supported_scalar_types):
if not np.isscalar(preds[0][field.name]):
raise ValueError("Invalid scalar prediction result.")
else:
raise ValueError("Unsupported field type in return struct type.")
else:
raise ValueError(
"Prediction results for StructType must be a dictionary or "
"a list of dictionary, got: {}".format(type(preds))
)
# check column names
if set(predNames) != set(fieldNames):
raise ValueError(
"Prediction result columns did not match expected return_type "
"columns: expected {}, got: {}".format(fieldNames, predNames)
)
return pd.DataFrame(preds)
elif isinstance(return_type, ArrayType):
if isinstance(preds, np.ndarray):
if len(preds) != num_input_rows:
raise ValueError("Prediction results must have same length as input data.")
if len(preds.shape) != 2:
raise ValueError("Prediction results for ArrayType must be two-dimensional.")
else:
raise ValueError("Prediction results for ArrayType must be an ndarray.")
return pd.Series(list(preds))
elif isinstance(return_type, supported_scalar_types):
preds_array: np.ndarray = preds # type: ignore
if len(preds_array) != num_input_rows:
raise ValueError("Prediction results must have same length as input data.")
if not (
(len(preds_array.shape) == 2 and preds_array.shape[1] == 1)
or len(preds_array.shape) == 1
):
raise ValueError("Invalid shape for scalar prediction result.")
output = np.squeeze(preds_array, -1) if len(preds_array.shape) > 1 else preds_array
return pd.Series(output).astype(output.dtype)
else:
raise ValueError("Unsupported return type")
[docs]def predict_batch_udf(
make_predict_fn: Callable[
[],
PredictBatchFunction,
],
*,
return_type: DataType,
batch_size: int,
input_tensor_shapes: Optional[Union[List[Optional[List[int]]], Mapping[int, List[int]]]] = None,
) -> UserDefinedFunctionLike:
"""Given a function which loads a model and returns a `predict` function for inference over a
batch of numpy inputs, returns a Pandas UDF wrapper for inference over a Spark DataFrame.
The returned Pandas UDF does the following on each DataFrame partition:
* calls the `make_predict_fn` to load the model and cache its `predict` function.
* batches the input records as numpy arrays and invokes `predict` on each batch.
Note: this assumes that the `make_predict_fn` encapsulates all of the necessary dependencies for
running the model, or the Spark executor environment already satisfies all runtime requirements.
For the conversion of the Spark DataFrame to numpy arrays, there is a one-to-one mapping between
the input arguments of the `predict` function (returned by the `make_predict_fn`) and the input
columns sent to the Pandas UDF (returned by the `predict_batch_udf`) at runtime. Each input
column will be converted as follows:
* scalar column -> 1-dim np.ndarray
* tensor column + tensor shape -> N-dim np.ndarray
Note that any tensor columns in the Spark DataFrame must be represented as a flattened
one-dimensional array, and multiple scalar columns can be combined into a single tensor column
using the standard :py:func:`pyspark.sql.functions.array()` function.
.. versionadded:: 3.4.0
Parameters
----------
make_predict_fn : callable
Function which is responsible for loading a model and returning a
:py:class:`PredictBatchFunction` which takes one or more numpy arrays as input and returns
one of the following:
* a numpy array (for a single output)
* a dictionary of named numpy arrays (for multiple outputs)
* a row-oriented list of dictionaries (for multiple outputs).
For a dictionary of named numpy arrays, the arrays can only be one or two dimensional, since
higher dimensional arrays are not supported. For a row-oriented list of dictionaries, each
element in the dictionary must be either a scalar or one-dimensional array.
return_type : :py:class:`pyspark.sql.types.DataType` or str.
Spark SQL datatype for the expected output:
* Scalar (e.g. IntegerType, FloatType) --> 1-dim numpy array.
* ArrayType --> 2-dim numpy array.
* StructType --> dict with keys matching struct fields.
* StructType --> list of dict with keys matching struct fields, for models like the
`Huggingface pipeline for sentiment analysis
<https://huggingface.co/docs/transformers/quicktour#pipeline-usage>`_.
batch_size : int
Batch size to use for inference. This is typically a limitation of the model
and/or available hardware resources and is usually smaller than the Spark partition size.
input_tensor_shapes : list, dict, optional.
A list of ints or a dictionary of ints (key) and list of ints (value).
Input tensor shapes for models with tensor inputs. This can be a list of shapes,
where each shape is a list of integers or None (for scalar inputs). Alternatively, this
can be represented by a "sparse" dictionary, where the keys are the integer indices of the
inputs, and the values are the shapes. Each tensor input value in the Spark DataFrame must
be represented as a single column containing a flattened 1-D array. The provided
`input_tensor_shapes` will be used to reshape the flattened array into the expected tensor
shape. For the list form, the order of the tensor shapes must match the order of the
selected DataFrame columns. The batch dimension (typically -1 or None in the first
dimension) should not be included, since it will be determined by the batch_size argument.
Tabular datasets with scalar-valued columns should not provide this argument.
Returns
-------
:py:class:`UserDefinedFunctionLike`
A Pandas UDF for model inference on a Spark DataFrame.
Examples
--------
For a pre-trained TensorFlow MNIST model with two-dimensional input images represented as a
flattened tensor value stored in a single Spark DataFrame column of type `array<float>`.
.. code-block:: python
from pyspark.ml.functions import predict_batch_udf
def make_mnist_fn():
# load/init happens once per python worker
import tensorflow as tf
model = tf.keras.models.load_model('/path/to/mnist_model')
# predict on batches of tasks/partitions, using cached model
def predict(inputs: np.ndarray) -> np.ndarray:
# inputs.shape = [batch_size, 784], see input_tensor_shapes
# outputs.shape = [batch_size, 10], see return_type
return model.predict(inputs)
return predict
mnist_udf = predict_batch_udf(make_mnist_fn,
return_type=ArrayType(FloatType()),
batch_size=100,
input_tensor_shapes=[[784]])
df = spark.read.parquet("/path/to/mnist_data")
df.show(5)
# +--------------------+
# | data|
# +--------------------+
# |[0.0, 0.0, 0.0, 0...|
# |[0.0, 0.0, 0.0, 0...|
# |[0.0, 0.0, 0.0, 0...|
# |[0.0, 0.0, 0.0, 0...|
# |[0.0, 0.0, 0.0, 0...|
# +--------------------+
df.withColumn("preds", mnist_udf("data")).show(5)
# +--------------------+--------------------+
# | data| preds|
# +--------------------+--------------------+
# |[0.0, 0.0, 0.0, 0...|[-13.511008, 8.84...|
# |[0.0, 0.0, 0.0, 0...|[-5.3957458, -2.2...|
# |[0.0, 0.0, 0.0, 0...|[-7.2014456, -8.8...|
# |[0.0, 0.0, 0.0, 0...|[-19.466187, -13....|
# |[0.0, 0.0, 0.0, 0...|[-5.7757926, -7.8...|
# +--------------------+--------------------+
To demonstrate usage with different combinations of input and output types, the following
examples just use simple mathematical transforms as the models.
* Single scalar column
Input DataFrame has a single scalar column, which will be passed to the `predict`
function as a 1-D numpy array.
>>> import numpy as np
>>> import pandas as pd
>>> from pyspark.ml.functions import predict_batch_udf
>>> from pyspark.sql.types import FloatType
>>>
>>> df = spark.createDataFrame(pd.DataFrame(np.arange(100)))
>>> df.show(5)
+---+
| 0|
+---+
| 0|
| 1|
| 2|
| 3|
| 4|
+---+
only showing top 5 rows
>>> def make_times_two_fn():
... def predict(inputs: np.ndarray) -> np.ndarray:
... # inputs.shape = [batch_size]
... # outputs.shape = [batch_size]
... return inputs * 2
... return predict
...
>>> times_two_udf = predict_batch_udf(make_times_two_fn,
... return_type=FloatType(),
... batch_size=10)
>>> df = spark.createDataFrame(pd.DataFrame(np.arange(100)))
>>> df.withColumn("x2", times_two_udf("0")).show(5)
+---+---+
| 0| x2|
+---+---+
| 0|0.0|
| 1|2.0|
| 2|4.0|
| 3|6.0|
| 4|8.0|
+---+---+
only showing top 5 rows
* Multiple scalar columns
Input DataFrame has multiple columns of scalar values. If the user-provided `predict`
function expects a single input, then the user must combine the multiple columns into a
single tensor using `pyspark.sql.functions.array`.
>>> import numpy as np
>>> import pandas as pd
>>> from pyspark.ml.functions import predict_batch_udf
>>> from pyspark.sql.functions import array
>>>
>>> data = np.arange(0, 1000, dtype=np.float64).reshape(-1, 4)
>>> pdf = pd.DataFrame(data, columns=['a','b','c','d'])
>>> df = spark.createDataFrame(pdf)
>>> df.show(5)
+----+----+----+----+
| a| b| c| d|
+----+----+----+----+
| 0.0| 1.0| 2.0| 3.0|
| 4.0| 5.0| 6.0| 7.0|
| 8.0| 9.0|10.0|11.0|
|12.0|13.0|14.0|15.0|
|16.0|17.0|18.0|19.0|
+----+----+----+----+
only showing top 5 rows
>>> def make_sum_fn():
... def predict(inputs: np.ndarray) -> np.ndarray:
... # inputs.shape = [batch_size, 4]
... # outputs.shape = [batch_size]
... return np.sum(inputs, axis=1)
... return predict
...
>>> sum_udf = predict_batch_udf(make_sum_fn,
... return_type=FloatType(),
... batch_size=10,
... input_tensor_shapes=[[4]])
>>> df.withColumn("sum", sum_udf(array("a", "b", "c", "d"))).show(5)
+----+----+----+----+----+
| a| b| c| d| sum|
+----+----+----+----+----+
| 0.0| 1.0| 2.0| 3.0| 6.0|
| 4.0| 5.0| 6.0| 7.0|22.0|
| 8.0| 9.0|10.0|11.0|38.0|
|12.0|13.0|14.0|15.0|54.0|
|16.0|17.0|18.0|19.0|70.0|
+----+----+----+----+----+
only showing top 5 rows
If the `predict` function expects multiple inputs, then the number of selected input columns
must match the number of expected inputs.
>>> def make_sum_fn():
... def predict(x1: np.ndarray,
... x2: np.ndarray,
... x3: np.ndarray,
... x4: np.ndarray) -> np.ndarray:
... # xN.shape = [batch_size]
... # outputs.shape = [batch_size]
... return x1 + x2 + x3 + x4
... return predict
...
>>> sum_udf = predict_batch_udf(make_sum_fn,
... return_type=FloatType(),
... batch_size=10)
>>> df.withColumn("sum", sum_udf("a", "b", "c", "d")).show(5)
+----+----+----+----+----+
| a| b| c| d| sum|
+----+----+----+----+----+
| 0.0| 1.0| 2.0| 3.0| 6.0|
| 4.0| 5.0| 6.0| 7.0|22.0|
| 8.0| 9.0|10.0|11.0|38.0|
|12.0|13.0|14.0|15.0|54.0|
|16.0|17.0|18.0|19.0|70.0|
+----+----+----+----+----+
only showing top 5 rows
* Multiple tensor columns
Input DataFrame has multiple columns, where each column is a tensor. The number of columns
should match the number of expected inputs for the user-provided `predict` function.
>>> import numpy as np
>>> import pandas as pd
>>> from pyspark.ml.functions import predict_batch_udf
>>> from pyspark.sql.types import ArrayType, FloatType, StructType, StructField
>>> from typing import Mapping
>>>
>>> data = np.arange(0, 1000, dtype=np.float64).reshape(-1, 4)
>>> pdf = pd.DataFrame(data, columns=['a','b','c','d'])
>>> pdf_tensor = pd.DataFrame()
>>> pdf_tensor['t1'] = pdf.values.tolist()
>>> pdf_tensor['t2'] = pdf.drop(columns='d').values.tolist()
>>> df = spark.createDataFrame(pdf_tensor)
>>> df.show(5)
+--------------------+------------------+
| t1| t2|
+--------------------+------------------+
|[0.0, 1.0, 2.0, 3.0]| [0.0, 1.0, 2.0]|
|[4.0, 5.0, 6.0, 7.0]| [4.0, 5.0, 6.0]|
|[8.0, 9.0, 10.0, ...| [8.0, 9.0, 10.0]|
|[12.0, 13.0, 14.0...|[12.0, 13.0, 14.0]|
|[16.0, 17.0, 18.0...|[16.0, 17.0, 18.0]|
+--------------------+------------------+
only showing top 5 rows
>>> def make_multi_sum_fn():
... def predict(x1: np.ndarray, x2: np.ndarray) -> np.ndarray:
... # x1.shape = [batch_size, 4]
... # x2.shape = [batch_size, 3]
... # outputs.shape = [batch_size]
... return np.sum(x1, axis=1) + np.sum(x2, axis=1)
... return predict
...
>>> multi_sum_udf = predict_batch_udf(
... make_multi_sum_fn,
... return_type=FloatType(),
... batch_size=5,
... input_tensor_shapes=[[4], [3]],
... )
>>> df.withColumn("sum", multi_sum_udf("t1", "t2")).show(5)
+--------------------+------------------+-----+
| t1| t2| sum|
+--------------------+------------------+-----+
|[0.0, 1.0, 2.0, 3.0]| [0.0, 1.0, 2.0]| 9.0|
|[4.0, 5.0, 6.0, 7.0]| [4.0, 5.0, 6.0]| 37.0|
|[8.0, 9.0, 10.0, ...| [8.0, 9.0, 10.0]| 65.0|
|[12.0, 13.0, 14.0...|[12.0, 13.0, 14.0]| 93.0|
|[16.0, 17.0, 18.0...|[16.0, 17.0, 18.0]|121.0|
+--------------------+------------------+-----+
only showing top 5 rows
* Multiple outputs
Some models can provide multiple outputs. These can be returned as a dictionary of named
values, which can be represented in either columnar or row-based formats.
>>> def make_multi_sum_fn():
... def predict_columnar(x1: np.ndarray, x2: np.ndarray) -> Mapping[str, np.ndarray]:
... # x1.shape = [batch_size, 4]
... # x2.shape = [batch_size, 3]
... return {
... "sum1": np.sum(x1, axis=1),
... "sum2": np.sum(x2, axis=1)
... }
... return predict_columnar
...
>>> multi_sum_udf = predict_batch_udf(
... make_multi_sum_fn,
... return_type=StructType([
... StructField("sum1", FloatType(), True),
... StructField("sum2", FloatType(), True)
... ]),
... batch_size=5,
... input_tensor_shapes=[[4], [3]],
... )
>>> df.withColumn("preds", multi_sum_udf("t1", "t2")).select("t1", "t2", "preds.*").show(5)
+--------------------+------------------+----+----+
| t1| t2|sum1|sum2|
+--------------------+------------------+----+----+
|[0.0, 1.0, 2.0, 3.0]| [0.0, 1.0, 2.0]| 6.0| 3.0|
|[4.0, 5.0, 6.0, 7.0]| [4.0, 5.0, 6.0]|22.0|15.0|
|[8.0, 9.0, 10.0, ...| [8.0, 9.0, 10.0]|38.0|27.0|
|[12.0, 13.0, 14.0...|[12.0, 13.0, 14.0]|54.0|39.0|
|[16.0, 17.0, 18.0...|[16.0, 17.0, 18.0]|70.0|51.0|
+--------------------+------------------+----+----+
only showing top 5 rows
>>> def make_multi_sum_fn():
... def predict_row(x1: np.ndarray, x2: np.ndarray) -> list[Mapping[str, float]]:
... # x1.shape = [batch_size, 4]
... # x2.shape = [batch_size, 3]
... return [{'sum1': np.sum(x1[i]), 'sum2': np.sum(x2[i])} for i in range(len(x1))]
... return predict_row
...
>>> multi_sum_udf = predict_batch_udf(
... make_multi_sum_fn,
... return_type=StructType([
... StructField("sum1", FloatType(), True),
... StructField("sum2", FloatType(), True)
... ]),
... batch_size=5,
... input_tensor_shapes=[[4], [3]],
... )
>>> df.withColumn("sum", multi_sum_udf("t1", "t2")).select("t1", "t2", "sum.*").show(5)
+--------------------+------------------+----+----+
| t1| t2|sum1|sum2|
+--------------------+------------------+----+----+
|[0.0, 1.0, 2.0, 3.0]| [0.0, 1.0, 2.0]| 6.0| 3.0|
|[4.0, 5.0, 6.0, 7.0]| [4.0, 5.0, 6.0]|22.0|15.0|
|[8.0, 9.0, 10.0, ...| [8.0, 9.0, 10.0]|38.0|27.0|
|[12.0, 13.0, 14.0...|[12.0, 13.0, 14.0]|54.0|39.0|
|[16.0, 17.0, 18.0...|[16.0, 17.0, 18.0]|70.0|51.0|
+--------------------+------------------+----+----+
only showing top 5 rows
Note that the multiple outputs can be arrays as well.
>>> def make_multi_times_two_fn():
... def predict(x1: np.ndarray, x2: np.ndarray) -> Mapping[str, np.ndarray]:
... # x1.shape = [batch_size, 4]
... # x2.shape = [batch_size, 3]
... return {"t1x2": x1 * 2, "t2x2": x2 * 2}
... return predict
...
>>> multi_times_two_udf = predict_batch_udf(
... make_multi_times_two_fn,
... return_type=StructType([
... StructField("t1x2", ArrayType(FloatType()), True),
... StructField("t2x2", ArrayType(FloatType()), True)
... ]),
... batch_size=5,
... input_tensor_shapes=[[4], [3]],
... )
>>> df.withColumn("x2", multi_times_two_udf("t1", "t2")).select("t1", "t2", "x2.*").show(5)
+--------------------+------------------+--------------------+------------------+
| t1| t2| t1x2| t2x2|
+--------------------+------------------+--------------------+------------------+
|[0.0, 1.0, 2.0, 3.0]| [0.0, 1.0, 2.0]|[0.0, 2.0, 4.0, 6.0]| [0.0, 2.0, 4.0]|
|[4.0, 5.0, 6.0, 7.0]| [4.0, 5.0, 6.0]|[8.0, 10.0, 12.0,...| [8.0, 10.0, 12.0]|
|[8.0, 9.0, 10.0, ...| [8.0, 9.0, 10.0]|[16.0, 18.0, 20.0...|[16.0, 18.0, 20.0]|
|[12.0, 13.0, 14.0...|[12.0, 13.0, 14.0]|[24.0, 26.0, 28.0...|[24.0, 26.0, 28.0]|
|[16.0, 17.0, 18.0...|[16.0, 17.0, 18.0]|[32.0, 34.0, 36.0...|[32.0, 34.0, 36.0]|
+--------------------+------------------+--------------------+------------------+
only showing top 5 rows
"""
# generate a new uuid each time this is invoked on the driver to invalidate executor-side cache.
model_uuid = uuid.uuid4()
def predict(data: Iterator[Union[pd.Series, pd.DataFrame]]) -> Iterator[pd.DataFrame]:
# TODO: adjust return type hint when Iterator[Union[pd.Series, pd.DataFrame]] is supported
from pyspark.ml.model_cache import ModelCache
# get predict function (from cache or from running user-provided make_predict_fn)
predict_fn = ModelCache.get(model_uuid)
if not predict_fn:
predict_fn = make_predict_fn()
ModelCache.add(model_uuid, predict_fn)
# get number of expected parameters for predict function
signature = inspect.signature(predict_fn)
num_expected_cols = len(signature.parameters)
# convert sparse input_tensor_shapes to dense if needed
input_shapes: List[List[int] | None]
if isinstance(input_tensor_shapes, Mapping):
input_shapes = [None] * num_expected_cols
for index, shape in input_tensor_shapes.items():
input_shapes[index] = shape
else:
input_shapes = input_tensor_shapes # type: ignore
# iterate over pandas batch, invoking predict_fn with ndarrays
for pandas_batch in data:
has_tuple = isinstance(pandas_batch, Tuple) # type: ignore
has_tensors = _has_tensor_cols(pandas_batch)
# require input_tensor_shapes for any tensor columns
if has_tensors and not input_shapes:
raise ValueError("Tensor columns require input_tensor_shapes")
for batch in _batched(pandas_batch, batch_size):
num_input_rows = len(batch)
num_input_cols = len(batch.columns)
if num_input_cols == num_expected_cols and num_expected_cols > 1:
# input column per expected input for multiple inputs
multi_inputs = _validate_and_transform_multiple_inputs(
batch, input_shapes, num_input_cols
)
# run model prediction function on multiple (numpy) inputs
preds = predict_fn(*multi_inputs)
elif num_expected_cols == 1:
# one or more input columns for single expected input
single_input = _validate_and_transform_single_input(
batch, input_shapes, has_tensors, has_tuple
)
# run model prediction function on single (numpy) inputs
preds = predict_fn(single_input)
else:
msg = "Model expected {} inputs, but received {} columns"
raise ValueError(msg.format(num_expected_cols, num_input_cols))
# return transformed predictions to Spark
yield _validate_and_transform_prediction_result(
preds, num_input_rows, return_type
) # type: ignore
return pandas_udf(predict, return_type) # type: ignore[call-overload]
def _test() -> None:
import doctest
from pyspark.sql import SparkSession
import pyspark.ml.functions
import sys
globs = pyspark.ml.functions.__dict__.copy()
spark = SparkSession.builder.master("local[2]").appName("ml.functions tests").getOrCreate()
sc = spark.sparkContext
globs["sc"] = sc
globs["spark"] = spark
(failure_count, test_count) = doctest.testmod(
pyspark.ml.functions,
globs=globs,
optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE,
)
spark.stop()
if failure_count:
sys.exit(-1)
if __name__ == "__main__":
_test()