WatermarkTracker¶
WatermarkTracker tracks the event-time watermark of a streaming query (across EventTimeWatermarkExec operators in a physical query plan) based on a given MultipleWatermarkPolicy.
WatermarkTracker is used in MicroBatchExecution.
Creating Instance¶
WatermarkTracker takes the following to be created:
WatermarkTracker is created (using apply) when MicroBatchExecution is requested to populate start offsets at start or restart (from a checkpoint).
MultipleWatermarkPolicy¶
WatermarkTracker is given a MultipleWatermarkPolicy when created that can be one of the following:
MaxWatermark(alias:min)MinWatermark(alias:max)
Creating WatermarkTracker¶
apply(
conf: RuntimeConfig): WatermarkTracker
apply uses the spark.sql.streaming.multipleWatermarkPolicy configuration property for the global watermark policy (default: min) and creates a WatermarkTracker.
apply is used when MicroBatchExecution is requested to populate start offsets at start or restart (from a checkpoint).
Global Event-Time Watermark¶
globalWatermarkMs: Long
WatermarkTracker uses globalWatermarkMs internal registry to keep track of global event-time watermark (based on MultipleWatermarkPolicy across all EventTimeWatermarkExec operators in a physical query plan).
Default: 0
globalWatermarkMs is used when WatermarkTracker is requested to updateWatermark.
The event-time watermark can be updated in setWatermark and updateWatermark.
The event-time watermark is used (as currentWatermark method) when MicroBatchExecution stream execution engine is requested to populateStartOffsets and constructNextBatch and runBatch.
Setting Watermark¶
setWatermark(
newWatermarkMs: Long): Unit
setWatermark sets the global event-time watermark to the given newWatermarkMs value.
setWatermark is used when:
MicroBatchExecutionis requested to populate start offsets at startup or restart (from a checkpoint)
Updating Watermark (at Execution)¶
updateWatermark(
executedPlan: SparkPlan): Unit
updateWatermark requests the given SparkPlan physical operator to collect all EventTimeWatermarkExec unary physical operators.
updateWatermark simply exits when no EventTimeWatermarkExec was found.
updateWatermark...FIXME
updateWatermark is used when:
MicroBatchExecutionis requested to run a single streaming batch
Watermarks by EventTimeWatermarkExec Operator Registry¶
operatorToWatermarkMap: Map[Int, Long]
WatermarkTracker uses operatorToWatermarkMap internal registry to keep track of event-time watermarks of every EventTimeWatermarkExec physical operator in a streaming query plan.
operatorToWatermarkMap is used when WatermarkTracker is requested to updateWatermark.
Logging¶
Enable ALL logging level for org.apache.spark.sql.execution.streaming.WatermarkTracker logger to see what happens inside.
Add the following line to conf/log4j.properties:
log4j.logger.org.apache.spark.sql.execution.streaming.WatermarkTracker=ALL
Refer to Logging.