EventTimeWatermarkExec Physical Operator¶
EventTimeWatermarkExec is a unary physical operator that represents EventTimeWatermark logical operator at execution time.
Tip
A unary physical operator (UnaryExecNode) is a physical operator with a single child physical operator.
Learn more about Unary Physical Operators (and physical operators in general) in The Internals of Spark SQL online book.
EventTimeWatermarkExec operator is used to extract (project) the values of the event-time watermark column and add them all to the EventTimeStatsAccum accumulator (and produce a EventTimeStats).
Creating Instance¶
EventTimeWatermarkExec takes the following to be created:
- Catalyst
Attributefor event time (Spark SQL) - Delay Interval (Spark SQL)
- Child Physical Operator (Spark SQL)
When created, EventTimeWatermarkExec registers the EventTimeStatsAccum accumulator (with the current SparkContext).
EventTimeWatermarkExec is created when StatefulAggregationStrategy execution planning strategy is executed (requested to plan a EventTimeWatermark logical operator for execution).
EventTimeStats Accumulator¶
eventTimeStats: EventTimeStatsAccum
EventTimeWatermarkExec creates an EventTimeStatsAccum accumulator when created.
When executed, EventTimeWatermarkExec uses the EventTimeStatsAccum to extract and accumulate eventTime values (as Longs) from every row in a streaming batch.
Note
Since the execution (data processing) happens on Spark executors, the only way to establish communication between the tasks (on the executors) and the driver is to use accumulator facility.
Learn more about Accumulators in The Internals of Apache Spark online book.
eventTimeStats is registered (with the current SparkContext) when EventTimeWatermarkExec is created. eventTimeStats uses no name (unnamed accumulator).
eventTimeStats is used to transfer the statistics (maximum, minimum, average and update count) of the long values in the event-time watermark column to be used for the following:
-
ProgressReporteris requested for the most recent execution statistics (formax,min,avg, andwatermarkevent-time watermark statistics) -
WatermarkTrackeris requested to updateWatermark
Executing Physical Operator¶
doExecute(): RDD[InternalRow]
doExecute is part of the SparkPlan (Spark SQL) abstraction.
doExecute executes the child physical operator and maps over the partitions (using RDD.mapPartitions).
doExecute creates an unsafe projection (per partition) for the column with the event time in the output schema of the child physical operator. The unsafe projection is to extract event times from the (stream of) internal rows of the child physical operator.
For every row in a partition, doExecute requests the eventTimeStats accumulator to accumulate the event time.
Note
The event time value is in seconds (not millis as the value is divided by 1000 ).
Output Attributes¶
output: Seq[Attribute]
output is part of the QueryPlan (Spark SQL) abstraction.
output requests the child physical operator for the output attributes to find the event time attribute and any other column with metadata that contains spark.watermarkDelayMs key.
For the event time attribute, output updates the metadata to include the delay interval for the spark.watermarkDelayMs key.
For any other column (not the event time attribute) with the spark.watermarkDelayMs key, output removes the key from the attribute metadata.
Demo¶
Check out Demo: Streaming Watermark with Aggregation in Append Output Mode to deep dive into the internals of Streaming Watermark.