Contributing to PySpark¶
There are many types of contribution, for example, helping other users, testing releases, reviewing changes, documentation contribution, bug reporting, JIRA maintenance, code changes, etc. These are documented at the general guidelines. This page focuses on PySpark and includes additional details specifically for PySpark.
Contributing by Testing Releases¶
Before the official release, PySpark release candidates are shared in the dev@spark.apache.org mailing list to vote on. This release candidates can be easily installed via pip. For example, in case of Spark 3.0.0 RC1, you can install as below:
pip install https://dist.apache.org/repos/dist/dev/spark/v3.0.0-rc1-bin/pyspark-3.0.0.tar.gz
The link for release files such as https://dist.apache.org/repos/dist/dev/spark/v3.0.0-rc1-bin
can be found in the vote thread.
Testing and verifying users’ existing workloads against release candidates is one of the vital contributions to PySpark. It prevents breaking users’ existing workloads before the official release. When there is an issue such as a regression, correctness problem or performance degradation worth enough to drop the release candidate, usually the release candidate is dropped and the community focuses on fixing it to include in the next release candidate.
Contributing Documentation Changes¶
The release documentation is located under Spark’s docs directory. README.md describes the required dependencies and steps to generate the documentations. Usually, PySpark documentation is tested with the command below under the docs directory:
SKIP_SCALADOC=1 SKIP_RDOC=1 SKIP_SQLDOC=1 bundle exec jekyll serve --watch
PySpark uses Sphinx to generate its release PySpark documentation. Therefore, if you want to build only PySpark documentation alone, you can build under python/docs directory by:
make html
It generates the corresponding HTMLs under python/docs/build/html
.
Lastly, please make sure that the new APIs are documented by manually adding methods and/or classes at the corresponding RST files
under python/docs/source/reference
. Otherwise, they would not be documented in PySpark documentation.
Preparing to Contribute Code Changes¶
Before starting to work on codes in PySpark, it is recommended to read the general guidelines. Additionally, there are a couple of additional notes to keep in mind when contributing to codes in PySpark:
- Be Pythonic
See The Zen of Python.
- Match APIs with Scala and Java sides
Apache Spark is an unified engine that provides a consistent API layer. In general, the APIs are consistently supported across other languages.
- PySpark-specific APIs can be accepted
As long as they are Pythonic and do not conflict with other existent APIs, it is fine to raise a API request, for example, decorator usage of UDFs.
- Adjust the corresponding type hints if you extend or modify public API
See Contributing and Maintaining Type Hints for details.
If you are fixing pandas API on Spark (pyspark.pandas
) package, please consider the design principles below:
- Return pandas-on-Spark data structure for big data, and pandas data structure for small data
Often developers face the question whether a particular function should return a pandas-on-Spark DataFrame/Series, or a pandas DataFrame/Series. The principle is: if the returned object can be large, use a pandas-on-Spark DataFrame/Series. If the data is bound to be small, use a pandas DataFrame/Series. For example,
DataFrame.dtypes
return a pandas Series, because the number of columns in a DataFrame is bounded and small, whereasDataFrame.head()
orSeries.unique()
returns a pandas-on-Spark DataFrame/Series, because the resulting object can be large.
- Provide discoverable APIs for common data science tasks
At the risk of overgeneralization, there are two API design approaches: the first focuses on providing APIs for common tasks; the second starts with abstractions, and enables users to accomplish their tasks by composing primitives. While the world is not black and white, pandas takes more of the former approach, while Spark has taken more of the latter.
One example is value count (count by some key column), one of the most common operations in data science. pandas
DataFrame.value_counts()
returns the result in sorted order, which in 90% of the cases is what users prefer when exploring data, whereas Spark’s does not sort, which is more desirable when building data pipelines, as users can accomplish the pandas behavior by adding an explicitorderBy
.Similar to pandas, pandas API on Spark should also lean more towards the former, providing discoverable APIs for common data science tasks. In most cases, this principle is well taken care of by simply implementing pandas’ APIs. However, there will be circumstances in which pandas’ APIs don’t address a specific need, e.g. plotting for big data.
- Guardrails to prevent users from shooting themselves in the foot
Certain operations in pandas are prohibitively expensive as data scales, and we don’t want to give users the illusion that they can rely on such operations in pandas API on Spark. That is to say, methods implemented in pandas API on Spark should be safe to perform by default on large datasets. As a result, the following capabilities are not implemented in pandas API on Spark:
Capabilities that are fundamentally not parallelizable: e.g. imperatively looping over each element
Capabilities that require materializing the entire working set in a single node’s memory. This is why we do not implement pandas.DataFrame.to_xarray. Another example is the
_repr_html_
call caps the total number of records shown to a maximum of 1000, to prevent users from blowing up their driver node simply by typing the name of the DataFrame in a notebook.
A few exceptions, however, exist. One common pattern with “big data science” is that while the initial dataset is large, the working set becomes smaller as the analysis goes deeper. For example, data scientists often perform aggregation on datasets and want to then convert the aggregated dataset to some local data structure. To help data scientists, we offer the following:
DataFrame.to_pandas
: returns a pandas DataFrame (pandas-on-Spark only)DataFrame.to_numpy
: returns a numpy array, works with both pandas and pandas API on Spark
Note that it is clear from the names that these functions return some local data structure that would require materializing data in a single node’s memory. For these functions, we also explicitly document them with a warning note that the resulting data structure must be small.
Environment Setup¶
Prerequisite¶
PySpark development requires to build Spark that needs a proper JDK installed, etc. See Building Spark for more details.
Note that if you intend to contribute to Spark Connect in Python, buf
version 1.24.0
is required, see Buf Installation for more details.
Conda¶
If you are using Conda, the development environment can be set as follows.
# Python 3.8+ is required
conda create --name pyspark-dev-env python=3.9
conda activate pyspark-dev-env
pip install --upgrade -r dev/requirements.txt
Once it is set up, make sure you switch to pyspark-dev-env before starting the development:
conda activate pyspark-dev-env
Now, you can start developing and running the tests.
pip¶
With Python 3.8+, pip can be used as below to install and set up the development environment.
pip install --upgrade -r dev/requirements.txt
Now, you can start developing and running the tests.
Contributing and Maintaining Type Hints¶
PySpark type hints are inlined, to take advantage of static type checking.
As a rule of thumb, only public API is annotated.
Annotations should, when possible:
Reflect expectations of the underlying JVM API, to help avoid type related failures outside Python interpreter.
In case of conflict between too broad (
Any
) and too narrow argument annotations, prefer the latter as one, as long as it is covering most of the typical use cases.Indicate nonsensical combinations of arguments using
@overload
annotations. For example, to indicate that*Col
and*Cols
arguments are mutually exclusive:@overload def __init__( self, *, threshold: float = ..., inputCol: Optional[str] = ..., outputCol: Optional[str] = ... ) -> None: ... @overload def __init__( self, *, thresholds: Optional[List[float]] = ..., inputCols: Optional[List[str]] = ..., outputCols: Optional[List[str]] = ... ) -> None: ...
Be compatible with the current stable MyPy release.
Complex supporting type definitions, should be placed in dedicated _typing.pyi
stubs. See for example pyspark.sql._typing.pyi.
Annotations can be validated using dev/lint-python
script or by invoking mypy directly:
mypy --config python/mypy.ini python/pyspark
Code and Docstring Guide¶
Code Conventions¶
Please follow the style of the existing codebase as is, which is virtually PEP 8 with one exception: lines can be up to 100 characters in length, not 79.
Note that:
the method and variable names in PySpark are the similar case is
threading
library in Python itself where the APIs were inspired by Java. PySpark also follows camelCase for exposed APIs that match with Scala and Java.In contrast,
functions.py
uses snake_case in order to make APIs SQL (and Python) friendly.In addition, pandas-on-Spark (
pyspark.pandas
) also uses snake_case because this package is free from API consistency with other languages.
PySpark leverages linters such as pycodestyle and flake8, which dev/lint-python
runs. Therefore, make sure to run that script to double check.
Docstring Conventions¶
PySpark follows NumPy documentation style.
Doctest Conventions¶
In general, doctests should be grouped logically by separating a newline.
For instance, the first block is for the statements for preparation, the second block is for using the function with a specific argument, and third block is for another argument. As a example, please refer DataFrame.rsub in pandas.
These blocks should be consistently separated in PySpark doctests, and more doctests should be added if the coverage of the doctests or the number of examples to show is not enough.
Contributing Error and Exception¶
To throw a standardized user-facing error or exception, developers should specify the error class and message parameters rather than an arbitrary error message.
Usage¶
Check if an appropriate error class already exists in error_classes.py. If true, use the error class and skip to step 3.
Add a new class to error_classes.py; keep in mind the invariants below.
Check if the exception type already extends PySparkException. If true, skip to step 5.
Mix PySparkException into the exception.
Throw the exception with the error class and message parameters.
Before
Throw with arbitrary error message:
raise ValueError("Problem A because B")
After
error_classes.py
"PROBLEM_BECAUSE": {
"message": ["Problem <problem> because <cause>"]
}
exceptions.py
class PySparkTestError(PySparkException):
def __init__(self, error_class: str, message_parameters: Dict[str, str]):
super().__init__(error_class=error_class, message_parameters=message_parameters)
def getMessageParameters(self) -> Optional[Dict[str, str]]:
return super().getMessageParameters()
Throw with error class and message parameters:
raise PySparkTestError("PROBLEM_BECAUSE", {"problem": "A", "cause": "B"})
Access fields¶
To access error fields, catch exceptions that extend PySparkException
and access to error class with PySparkException.getErrorClass()
.
try:
...
except PySparkException as pe:
if pe.getErrorClass() == "PROBLEM_BECAUSE":
...
Fields¶
Error class
Error classes are a succinct, human-readable representation of the error category.
An uncategorized errors can be assigned to a legacy error class with the prefix _LEGACY_ERROR_TEMP_ and an unused sequential number, for instance _LEGACY_ERROR_TEMP_0053.
Invariants:
Unique
Consistent across releases
Sorted alphabetically
Message
Error messages provide a descriptive, human-readable representation of the error. The message format accepts string parameters via the C-style printf syntax.
The quality of the error message should match the Apache Spark Error Message Guidelines
Invariants:
Unique