Skip to content

WatermarkTracker

WatermarkTracker tracks the event-time watermark of a streaming query (across EventTimeWatermarkExec operators in a physical query plan) based on a given MultipleWatermarkPolicy.

WatermarkTracker is used in MicroBatchExecution.

Creating Instance

WatermarkTracker takes the following to be created:

WatermarkTracker is created (using apply) when MicroBatchExecution is requested to populate start offsets at start or restart (from a checkpoint).

MultipleWatermarkPolicy

WatermarkTracker is given a MultipleWatermarkPolicy when created that can be one of the following:

  • MaxWatermark (alias: min)
  • MinWatermark (alias: max)

Creating WatermarkTracker

apply(
  conf: RuntimeConfig): WatermarkTracker

apply uses the spark.sql.streaming.multipleWatermarkPolicy configuration property for the global watermark policy (default: min) and creates a WatermarkTracker.

apply is used when MicroBatchExecution is requested to populate start offsets at start or restart (from a checkpoint).

Global Event-Time Watermark

globalWatermarkMs: Long

WatermarkTracker uses globalWatermarkMs internal registry to keep track of global event-time watermark (based on MultipleWatermarkPolicy across all EventTimeWatermarkExec operators in a physical query plan).

Default: 0

globalWatermarkMs is used when WatermarkTracker is requested to updateWatermark.

The event-time watermark can be updated in setWatermark and updateWatermark.

The event-time watermark is used (as currentWatermark method) when MicroBatchExecution stream execution engine is requested to populateStartOffsets and constructNextBatch and runBatch.

Setting Watermark

setWatermark(
  newWatermarkMs: Long): Unit

setWatermark sets the global event-time watermark to the given newWatermarkMs value.


setWatermark is used when:

Updating Watermark (at Execution)

updateWatermark(
  executedPlan: SparkPlan): Unit

updateWatermark requests the given SparkPlan physical operator to collect all EventTimeWatermarkExec unary physical operators.

updateWatermark simply exits when no EventTimeWatermarkExec was found.

updateWatermark...FIXME


updateWatermark is used when:

Watermarks by EventTimeWatermarkExec Operator Registry

operatorToWatermarkMap: Map[Int, Long]

WatermarkTracker uses operatorToWatermarkMap internal registry to keep track of event-time watermarks of every EventTimeWatermarkExec physical operator in a streaming query plan.

operatorToWatermarkMap is used when WatermarkTracker is requested to updateWatermark.

Logging

Enable ALL logging level for org.apache.spark.sql.execution.streaming.WatermarkTracker logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.execution.streaming.WatermarkTracker=ALL

Refer to Logging.