Skip to content

EventTimeWatermarkExec Physical Operator

EventTimeWatermarkExec is a unary physical operator that represents EventTimeWatermark logical operator at execution time.

Tip

A unary physical operator (UnaryExecNode) is a physical operator with a single child physical operator.

Learn more about Unary Physical Operators (and physical operators in general) in The Internals of Spark SQL online book.

EventTimeWatermarkExec operator is used to extract (project) the values of the event-time watermark column and add them all to the EventTimeStatsAccum accumulator (and produce a EventTimeStats).

Creating Instance

EventTimeWatermarkExec takes the following to be created:

When created, EventTimeWatermarkExec registers the EventTimeStatsAccum accumulator (with the current SparkContext).

EventTimeWatermarkExec is created when StatefulAggregationStrategy execution planning strategy is executed (requested to plan a EventTimeWatermark logical operator for execution).

EventTimeStats Accumulator

eventTimeStats: EventTimeStatsAccum

EventTimeWatermarkExec creates an EventTimeStatsAccum accumulator when created.

When executed, EventTimeWatermarkExec uses the EventTimeStatsAccum to extract and accumulate eventTime values (as Longs) from every row in a streaming batch.

Note

Since the execution (data processing) happens on Spark executors, the only way to establish communication between the tasks (on the executors) and the driver is to use accumulator facility.

Learn more about Accumulators in The Internals of Apache Spark online book.

eventTimeStats is registered (with the current SparkContext) when EventTimeWatermarkExec is created. eventTimeStats uses no name (unnamed accumulator).

eventTimeStats is used to transfer the statistics (maximum, minimum, average and update count) of the long values in the event-time watermark column to be used for the following:

  • ProgressReporter is requested for the most recent execution statistics (for max, min, avg, and watermark event-time watermark statistics)

  • WatermarkTracker is requested to updateWatermark

Executing Physical Operator

doExecute(): RDD[InternalRow]

doExecute is part of the SparkPlan (Spark SQL) abstraction.

doExecute executes the child physical operator and maps over the partitions (using RDD.mapPartitions).

doExecute creates an unsafe projection (per partition) for the column with the event time in the output schema of the child physical operator. The unsafe projection is to extract event times from the (stream of) internal rows of the child physical operator.

For every row in a partition, doExecute requests the eventTimeStats accumulator to accumulate the event time.

Note

The event time value is in seconds (not millis as the value is divided by 1000 ).

Output Attributes

output: Seq[Attribute]

output is part of the QueryPlan (Spark SQL) abstraction.

output requests the child physical operator for the output attributes to find the event time attribute and any other column with metadata that contains spark.watermarkDelayMs key.

For the event time attribute, output updates the metadata to include the delay interval for the spark.watermarkDelayMs key.

For any other column (not the event time attribute) with the spark.watermarkDelayMs key, output removes the key from the attribute metadata.

Demo

Check out Demo: Streaming Watermark with Aggregation in Append Output Mode to deep dive into the internals of Streaming Watermark.