EventTimeWatermarkExec Physical Operator¶
EventTimeWatermarkExec
is a unary physical operator that represents EventTimeWatermark logical operator at execution time.
Tip
A unary physical operator (UnaryExecNode
) is a physical operator with a single child physical operator.
Learn more about Unary Physical Operators (and physical operators in general) in The Internals of Spark SQL online book.
EventTimeWatermarkExec
operator is used to extract (project) the values of the event-time watermark column and add them all to the EventTimeStatsAccum accumulator (and produce a EventTimeStats).
Creating Instance¶
EventTimeWatermarkExec
takes the following to be created:
- Catalyst
Attribute
for event time (Spark SQL) - Delay Interval (Spark SQL)
- Child Physical Operator (Spark SQL)
When created, EventTimeWatermarkExec
registers the EventTimeStatsAccum accumulator (with the current SparkContext
).
EventTimeWatermarkExec
is created when StatefulAggregationStrategy execution planning strategy is executed (requested to plan a EventTimeWatermark logical operator for execution).
EventTimeStats Accumulator¶
eventTimeStats: EventTimeStatsAccum
EventTimeWatermarkExec
creates an EventTimeStatsAccum accumulator when created.
When executed, EventTimeWatermarkExec
uses the EventTimeStatsAccum
to extract and accumulate eventTime values (as Long
s) from every row in a streaming batch.
Note
Since the execution (data processing) happens on Spark executors, the only way to establish communication between the tasks (on the executors) and the driver is to use accumulator facility.
Learn more about Accumulators in The Internals of Apache Spark online book.
eventTimeStats
is registered (with the current SparkContext
) when EventTimeWatermarkExec
is created. eventTimeStats
uses no name (unnamed accumulator).
eventTimeStats
is used to transfer the statistics (maximum, minimum, average and update count) of the long values in the event-time watermark column to be used for the following:
-
ProgressReporter
is requested for the most recent execution statistics (formax
,min
,avg
, andwatermark
event-time watermark statistics) -
WatermarkTracker
is requested to updateWatermark
Executing Physical Operator¶
doExecute(): RDD[InternalRow]
doExecute
is part of the SparkPlan
(Spark SQL) abstraction.
doExecute
executes the child physical operator and maps over the partitions (using RDD.mapPartitions
).
doExecute
creates an unsafe projection (per partition) for the column with the event time in the output schema of the child physical operator. The unsafe projection is to extract event times from the (stream of) internal rows of the child physical operator.
For every row in a partition, doExecute
requests the eventTimeStats accumulator to accumulate the event time.
Note
The event time value is in seconds (not millis as the value is divided by 1000
).
Output Attributes¶
output: Seq[Attribute]
output
is part of the QueryPlan
(Spark SQL) abstraction.
output
requests the child physical operator for the output attributes to find the event time attribute and any other column with metadata that contains spark.watermarkDelayMs key.
For the event time attribute, output
updates the metadata to include the delay interval for the spark.watermarkDelayMs key.
For any other column (not the event time attribute) with the spark.watermarkDelayMs key, output
removes the key from the attribute metadata.
Demo¶
Check out Demo: Streaming Watermark with Aggregation in Append Output Mode to deep dive into the internals of Streaming Watermark.