pyspark.sql.functions.
session_window
Generates session window given a timestamp specifying column. Session window is one of dynamic windows, which means the length of window is varying according to the given inputs. The length of session window is defined as “the timestamp of latest input of the session + gap duration”, so when the new inputs are bound to the current session window, the end time of session window can be expanded according to the new inputs. Windows can support microsecond precision. Windows in the order of months are not supported. For a streaming query, you may use the function current_timestamp to generate windows on processing time. gapDuration is provided as strings, e.g. ‘1 second’, ‘1 day 12 hours’, ‘2 minutes’. Valid interval strings are ‘week’, ‘day’, ‘hour’, ‘minute’, ‘second’, ‘millisecond’, ‘microsecond’. It could also be a Column which can be evaluated to gap duration dynamically based on the input row. The output column will be a struct called ‘session_window’ by default with the nested columns ‘start’ and ‘end’, where ‘start’ and ‘end’ will be of pyspark.sql.types.TimestampType.
pyspark.sql.types.TimestampType
New in version 3.2.0.
Changed in version 3.4.0: Supports Spark Connect.
Column
The column name or column to use as the timestamp for windowing by time. The time column must be of TimestampType or TimestampNTZType.
A Python string literal or column specifying the timeout of the session. It could be static value, e.g. 10 minutes, 1 second, or an expression/UDF that specifies gap duration dynamically based on the input row.
the column for computed results.
Examples
>>> df = spark.createDataFrame([("2016-03-11 09:00:07", 1)]).toDF("date", "val") >>> w = df.groupBy(session_window("date", "5 seconds")).agg(sum("val").alias("sum")) >>> w.select(w.session_window.start.cast("string").alias("start"), ... w.session_window.end.cast("string").alias("end"), "sum").collect() [Row(start='2016-03-11 09:00:07', end='2016-03-11 09:00:12', sum=1)] >>> w = df.groupBy(session_window("date", lit("5 seconds"))).agg(sum("val").alias("sum")) >>> w.select(w.session_window.start.cast("string").alias("start"), ... w.session_window.end.cast("string").alias("end"), "sum").collect() [Row(start='2016-03-11 09:00:07', end='2016-03-11 09:00:12', sum=1)]