Streaming Watermark¶
Streaming Watermark (allowed lateness) of a stateful streaming query is a moving threshold of event time and specifies how long to wait for late and possibly out-of-order events until a streaming state can be considered final and not to change ever (so the engine can automatically drop incoming late data given event time and clean up old state accordingly).
Streaming watermark is used to mark events (modeled as rows in a streaming query) that are older than the watermark threshold as "too late", and not "interesting" to update partial non-final streaming state (of an aggregation or a join).
In Spark Structured Streaming, streaming watermark is defined using Dataset.withWatermark high-level operator.
withWatermark(
eventTime: String,
delayThreshold: String): Dataset[T]
In Dataset.withWatermark operator, eventTime
is the name of the column to use to monitor event time whereas delayThreshold
is the threshold.
Watermark Threshold (Delay Threshold) says how late and possibly out-of-order events are still acceptable and contribute to the final result of a streaming state. Event-time watermark delay is used to calculate the difference between the event time of an event and the time in the past.
Event-Time Watermark is then a time threshold (point in time) that is the minimum acceptable time of an event that is accepted in a streaming state.
With streaming watermark, memory usage of a streaming state should be carefully monitored as late events can easily be dropped, and old state that are never going to be updated removed. This avoids unbounded streaming state that would inevitably use up all the available memory of long-running streaming queries and end up in out of memory errors.
In Append output mode the current event-time streaming watermark is used for the following:
-
Output saved state rows that became expired (Expired events in the demo)
-
Dropping late events, i.e. don't save them to a state store or include in aggregation (Late events in the demo)
Streaming watermark is required for a streaming aggregation in Append output mode.
Streaming Aggregation¶
In streaming aggregation, a streaming watermark has to be defined on one or many grouping expressions of a streaming aggregation (directly or using window standard function).
Note
Dataset.withWatermark operator has to be used before an aggregation operator (for the watermark to have an effect).
Streaming Join¶
In streaming join, a streaming watermark can be defined on join keys or any of the join sides.
Demos¶
Use the following demos to learn more:
Internals¶
Under the covers, Dataset.withWatermark high-level operator creates a logical query plan with EventTimeWatermark logical operator.
EventTimeWatermark
logical operator is planned to EventTimeWatermarkExec physical operator that extracts the event time values (from the rows processed) and adds them to an accumulator.
Since the execution (data processing) happens on Spark executors, using the accumulator is the only Spark-approved way for communication between the tasks (on the executors) and the driver. Using accumulator updates the driver with the current event-time watermark.
During the query planning phase (in MicroBatchExecution and ContinuousExecution) that also happens on the driver, IncrementalExecution
is given the current OffsetSeqMetadata with the current event-time watermark.