#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from typing import Callable, Generic, Optional, Type, Union, TYPE_CHECKING
import warnings
from pyspark.pandas._typing import T
if TYPE_CHECKING:
from pyspark.pandas.frame import DataFrame # noqa: F401 (SPARK-34943)
from pyspark.pandas.indexes import Index # noqa: F401 (SPARK-34943)
from pyspark.pandas.series import Series # noqa: F401 (SPARK-34943)
class CachedAccessor(Generic[T]):
"""
Custom property-like object.
A descriptor for caching accessors:
Parameters
----------
name : str
Namespace that accessor's methods, properties, etc will be accessed under, e.g. "foo" for a
dataframe accessor yields the accessor ``df.foo``
accessor: cls
Class with the extension methods.
Notes
-----
For accessor, the class's __init__ method assumes that you are registering an accessor for one
of ``Series``, ``DataFrame``, or ``Index``.
This object is not meant to be instantiated directly. Instead, use register_dataframe_accessor,
register_series_accessor, or register_index_accessor.
The pandas-on-Spark accessor is modified based on pandas.core.accessor.
"""
def __init__(self, name: str, accessor: Type[T]) -> None:
self._name = name
self._accessor = accessor
def __get__(
self, obj: Optional[Union["DataFrame", "Series", "Index"]], cls: Type[T]
) -> Union[T, Type[T]]:
if obj is None:
return self._accessor
accessor_obj = self._accessor(obj) # type: ignore
object.__setattr__(obj, self._name, accessor_obj)
return accessor_obj
def _register_accessor(
name: str, cls: Union[Type["DataFrame"], Type["Series"], Type["Index"]]
) -> Callable[[Type[T]], Type[T]]:
"""
Register a custom accessor on {klass} objects.
Parameters
----------
name : str
Name under which the accessor should be registered. A warning is issued if this name
conflicts with a preexisting attribute.
Returns
-------
callable
A class decorator.
See Also
--------
register_dataframe_accessor: Register a custom accessor on DataFrame objects
register_series_accessor: Register a custom accessor on Series objects
register_index_accessor: Register a custom accessor on Index objects
Notes
-----
When accessed, your accessor will be initialiazed with the pandas-on-Spark object the user
is interacting with. The code signature must be:
.. code-block:: python
def __init__(self, pandas_on_spark_obj):
# constructor logic
...
In the pandas API, if data passed to your accessor has an incorrect dtype, it's recommended to
raise an ``AttributeError`` for consistency purposes. In pandas-on-Spark, ``ValueError`` is more
frequently used to annotate when a value's datatype is unexpected for a given method/function.
Ultimately, you can structure this however you like, but pandas-on-Spark would likely do
something like this:
>>> ps.Series(['a', 'b']).dt
...
Traceback (most recent call last):
...
ValueError: Cannot call DatetimeMethods on type StringType
Note: This function is not meant to be used directly - instead, use register_dataframe_accessor,
register_series_accessor, or register_index_accessor.
"""
def decorator(accessor: Type[T]) -> Type[T]:
if hasattr(cls, name):
msg = (
"registration of accessor {0} under name '{1}' for type {2} is overriding "
"a preexisting attribute with the same name.".format(accessor, name, cls.__name__)
)
warnings.warn(
msg,
UserWarning,
stacklevel=2,
)
setattr(cls, name, CachedAccessor(name, accessor))
return accessor
return decorator
[docs]def register_dataframe_accessor(name: str) -> Callable[[Type[T]], Type[T]]:
"""
Register a custom accessor with a DataFrame
Parameters
----------
name : str
name used when calling the accessor after its registered
Returns
-------
callable
A class decorator.
See Also
--------
register_series_accessor: Register a custom accessor on Series objects
register_index_accessor: Register a custom accessor on Index objects
Notes
-----
When accessed, your accessor will be initialiazed with the pandas-on-Spark object the user
is interacting with. The accessor's init method should always ingest the object being accessed.
See the examples for the init signature.
In the pandas API, if data passed to your accessor has an incorrect dtype, it's recommended to
raise an ``AttributeError`` for consistency purposes. In pandas-on-Spark, ``ValueError`` is more
frequently used to annotate when a value's datatype is unexpected for a given method/function.
Ultimately, you can structure this however you like, but pandas-on-Spark would likely do
something like this:
>>> ps.Series(['a', 'b']).dt
...
Traceback (most recent call last):
...
ValueError: Cannot call DatetimeMethods on type StringType
Examples
--------
In your library code::
from pyspark.pandas.extensions import register_dataframe_accessor
@register_dataframe_accessor("geo")
class GeoAccessor:
def __init__(self, pandas_on_spark_obj):
self._obj = pandas_on_spark_obj
# other constructor logic
@property
def center(self):
# return the geographic center point of this DataFrame
lat = self._obj.latitude
lon = self._obj.longitude
return (float(lon.mean()), float(lat.mean()))
def plot(self):
# plot this array's data on a map
pass
Then, in an ipython session::
>>> ## Import if the accessor is in the other file.
>>> # from my_ext_lib import GeoAccessor
>>> psdf = ps.DataFrame({"longitude": np.linspace(0,10),
... "latitude": np.linspace(0, 20)})
>>> psdf.geo.center # doctest: +SKIP
(5.0, 10.0)
>>> psdf.geo.plot() # doctest: +SKIP
"""
from pyspark.pandas import DataFrame
return _register_accessor(name, DataFrame)
[docs]def register_series_accessor(name: str) -> Callable[[Type[T]], Type[T]]:
"""
Register a custom accessor with a Series object
Parameters
----------
name : str
name used when calling the accessor after its registered
Returns
-------
callable
A class decorator.
See Also
--------
register_dataframe_accessor: Register a custom accessor on DataFrame objects
register_index_accessor: Register a custom accessor on Index objects
Notes
-----
When accessed, your accessor will be initialiazed with the pandas-on-Spark object the user is
interacting with. The code signature must be::
def __init__(self, pandas_on_spark_obj):
# constructor logic
...
In the pandas API, if data passed to your accessor has an incorrect dtype, it's recommended to
raise an ``AttributeError`` for consistency purposes. In pandas-on-Spark, ``ValueError`` is more
frequently used to annotate when a value's datatype is unexpected for a given method/function.
Ultimately, you can structure this however you like, but pandas-on-Spark would likely do
something like this:
>>> ps.Series(['a', 'b']).dt
...
Traceback (most recent call last):
...
ValueError: Cannot call DatetimeMethods on type StringType
Examples
--------
In your library code::
from pyspark.pandas.extensions import register_series_accessor
@register_series_accessor("geo")
class GeoAccessor:
def __init__(self, pandas_on_spark_obj):
self._obj = pandas_on_spark_obj
@property
def is_valid(self):
# boolean check to see if series contains valid geometry
return True
Then, in an ipython session::
>>> ## Import if the accessor is in the other file.
>>> # from my_ext_lib import GeoAccessor
>>> psdf = ps.DataFrame({"longitude": np.linspace(0,10),
... "latitude": np.linspace(0, 20)})
>>> psdf.longitude.geo.is_valid # doctest: +SKIP
True
"""
from pyspark.pandas import Series
return _register_accessor(name, Series)
[docs]def register_index_accessor(name: str) -> Callable[[Type[T]], Type[T]]:
"""
Register a custom accessor with an Index
Parameters
----------
name : str
name used when calling the accessor after its registered
Returns
-------
callable
A class decorator.
See Also
--------
register_dataframe_accessor: Register a custom accessor on DataFrame objects
register_series_accessor: Register a custom accessor on Series objects
Notes
-----
When accessed, your accessor will be initialiazed with the pandas-on-Spark object the user is
interacting with. The code signature must be::
def __init__(self, pandas_on_spark_obj):
# constructor logic
...
In the pandas API, if data passed to your accessor has an incorrect dtype, it's recommended to
raise an ``AttributeError`` for consistency purposes. In pandas-on-Spark, ``ValueError`` is more
frequently used to annotate when a value's datatype is unexpected for a given method/function.
Ultimately, you can structure this however you like, but pandas-on-Spark would likely do
something like this:
>>> ps.Series(['a', 'b']).dt
...
Traceback (most recent call last):
...
ValueError: Cannot call DatetimeMethods on type StringType
Examples
--------
In your library code::
from pyspark.pandas.extensions import register_index_accessor
@register_index_accessor("foo")
class CustomAccessor:
def __init__(self, pandas_on_spark_obj):
self._obj = pandas_on_spark_obj
self.item = "baz"
@property
def bar(self):
# return item value
return self.item
Then, in an ipython session::
>>> ## Import if the accessor is in the other file.
>>> # from my_ext_lib import CustomAccessor
>>> psdf = ps.DataFrame({"longitude": np.linspace(0,10),
... "latitude": np.linspace(0, 20)})
>>> psdf.index.foo.bar # doctest: +SKIP
'baz'
"""
from pyspark.pandas import Index
return _register_accessor(name, Index)
def _test() -> None:
import os
import doctest
import sys
import numpy
from pyspark.sql import SparkSession
import pyspark.pandas.extensions
os.chdir(os.environ["SPARK_HOME"])
globs = pyspark.pandas.extensions.__dict__.copy()
globs["np"] = numpy
globs["ps"] = pyspark.pandas
spark = (
SparkSession.builder.master("local[4]")
.appName("pyspark.pandas.extensions tests")
.getOrCreate()
)
(failure_count, test_count) = doctest.testmod(
pyspark.pandas.extensions,
globs=globs,
optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE,
)
spark.stop()
if failure_count:
sys.exit(-1)
if __name__ == "__main__":
_test()