pyspark.SparkContext.setJobGroup

SparkContext.setJobGroup(groupId: str, description: str, interruptOnCancel: bool = False) → None[source]

Assigns a group ID to all the jobs started by this thread until the group ID is set to a different value or cleared.

Often, a unit of execution in an application consists of multiple Spark actions or jobs. Application programmers can use this method to group all those jobs together and give a group description. Once set, the Spark web UI will associate such jobs with this group.

The application can use SparkContext.cancelJobGroup() to cancel all running jobs in this group.

New in version 1.0.0.

Parameters
groupIdstr

The group ID to assign.

descriptionstr

The description to set for the job group.

interruptOnCancelbool, optional, default False

whether to interrupt jobs on job cancellation.

Notes

If interruptOnCancel is set to true for the job group, then job cancellation will result in Thread.interrupt() being called on the job’s executor threads. This is useful to help ensure that the tasks are actually stopped in a timely manner, but is off by default due to HDFS-1208, where HDFS may respond to Thread.interrupt() by marking nodes as dead.

If you run jobs in parallel, use pyspark.InheritableThread for thread local inheritance.

Examples

>>> import threading
>>> from time import sleep
>>> from pyspark import InheritableThread
>>> result = "Not Set"
>>> lock = threading.Lock()
>>> def map_func(x):
...     sleep(100)
...     raise RuntimeError("Task should have been cancelled")
>>> def start_job(x):
...     global result
...     try:
...         sc.setJobGroup("job_to_cancel", "some description")
...         result = sc.parallelize(range(x)).map(map_func).collect()
...     except Exception as e:
...         result = "Cancelled"
...     lock.release()
>>> def stop_job():
...     sleep(5)
...     sc.cancelJobGroup("job_to_cancel")
>>> suppress = lock.acquire()
>>> suppress = InheritableThread(target=start_job, args=(10,)).start()
>>> suppress = InheritableThread(target=stop_job).start()
>>> suppress = lock.acquire()
>>> print(result)
Cancelled