WatermarkTracker¶
WatermarkTracker
tracks the event-time watermark of a streaming query (across EventTimeWatermarkExec operators in a physical query plan) based on a given MultipleWatermarkPolicy.
WatermarkTracker
is used in MicroBatchExecution.
Creating Instance¶
WatermarkTracker
takes the following to be created:
WatermarkTracker
is created (using apply) when MicroBatchExecution
is requested to populate start offsets at start or restart (from a checkpoint).
MultipleWatermarkPolicy¶
WatermarkTracker
is given a MultipleWatermarkPolicy
when created that can be one of the following:
MaxWatermark
(alias:min
)MinWatermark
(alias:max
)
Creating WatermarkTracker¶
apply(
conf: RuntimeConfig): WatermarkTracker
apply
uses the spark.sql.streaming.multipleWatermarkPolicy configuration property for the global watermark policy (default: min
) and creates a WatermarkTracker
.
apply
is used when MicroBatchExecution
is requested to populate start offsets at start or restart (from a checkpoint).
Global Event-Time Watermark¶
globalWatermarkMs: Long
WatermarkTracker
uses globalWatermarkMs
internal registry to keep track of global event-time watermark (based on MultipleWatermarkPolicy across all EventTimeWatermarkExec operators in a physical query plan).
Default: 0
globalWatermarkMs
is used when WatermarkTracker
is requested to updateWatermark.
The event-time watermark can be updated in setWatermark and updateWatermark.
The event-time watermark is used (as currentWatermark
method) when MicroBatchExecution
stream execution engine is requested to populateStartOffsets and constructNextBatch and runBatch.
Setting Watermark¶
setWatermark(
newWatermarkMs: Long): Unit
setWatermark
sets the global event-time watermark to the given newWatermarkMs
value.
setWatermark
is used when:
MicroBatchExecution
is requested to populate start offsets at startup or restart (from a checkpoint)
Updating Watermark (at Execution)¶
updateWatermark(
executedPlan: SparkPlan): Unit
updateWatermark
requests the given SparkPlan
physical operator to collect all EventTimeWatermarkExec unary physical operators.
updateWatermark
simply exits when no EventTimeWatermarkExec
was found.
updateWatermark
...FIXME
updateWatermark
is used when:
MicroBatchExecution
is requested to run a single streaming batch
Watermarks by EventTimeWatermarkExec Operator Registry¶
operatorToWatermarkMap: Map[Int, Long]
WatermarkTracker
uses operatorToWatermarkMap
internal registry to keep track of event-time watermarks of every EventTimeWatermarkExec physical operator in a streaming query plan.
operatorToWatermarkMap
is used when WatermarkTracker
is requested to updateWatermark.
Logging¶
Enable ALL
logging level for org.apache.spark.sql.execution.streaming.WatermarkTracker
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.sql.execution.streaming.WatermarkTracker=ALL
Refer to Logging.