WatermarkTracker¶
WatermarkTracker
tracks the event-time watermark of a streaming query (across EventTimeWatermarkExec operators in a physical query plan) based on a given MultipleWatermarkPolicy.
WatermarkTracker
is used in MicroBatchExecution.
Creating Instance¶
WatermarkTracker
takes the following to be created:
WatermarkTracker
is created (using apply) when MicroBatchExecution
is requested to populate start offsets at start or restart (from a checkpoint).
MultipleWatermarkPolicy¶
WatermarkTracker
is given a MultipleWatermarkPolicy
when created that can be one of the following:
MaxWatermark
(alias:min
)MinWatermark
(alias:max
)
Creating WatermarkTracker¶
apply(
conf: RuntimeConfig): WatermarkTracker
apply
uses the spark.sql.streaming.multipleWatermarkPolicy configuration property for the global watermark policy (default: min
) and creates a WatermarkTracker
.
apply
is used when MicroBatchExecution
is requested to populate start offsets at start or restart (from a checkpoint).
Global Event-Time Watermark¶
globalWatermarkMs: Long
WatermarkTracker
uses globalWatermarkMs
internal registry to keep track of global event-time watermark (based on MultipleWatermarkPolicy across all EventTimeWatermarkExec operators in a physical query plan).
Default: 0
globalWatermarkMs
is used when WatermarkTracker
is requested to updateWatermark.
The event-time watermark can be updated in setWatermark and updateWatermark.
The event-time watermark is used (as currentWatermark
method) when MicroBatchExecution
stream execution engine is requested to populateStartOffsets and constructNextBatch and runBatch.
Updating Watermark (at Startup and Restart)¶
setWatermark(
newWatermarkMs: Long): Unit
setWatermark
sets the global event-time watermark to the given newWatermarkMs
value.
setWatermark
is used when MicroBatchExecution
is requested to populate start offsets at start or restart (from a checkpoint).
Updating Watermark (at Execution)¶
updateWatermark(
executedPlan: SparkPlan): Unit
updateWatermark
requests the given SparkPlan
physical operator to collect all EventTimeWatermarkExec unary physical operators.
updateWatermark
simply exits when no EventTimeWatermarkExec
was found.
updateWatermark
...FIXME
updateWatermark
is used when MicroBatchExecution
is requested to run a single streaming batch (when requested to run an activated streaming query).
Watermarks by EventTimeWatermarkExec Operator Registry¶
operatorToWatermarkMap: Map[Int, Long]
WatermarkTracker
uses operatorToWatermarkMap
internal registry to keep track of event-time watermarks of every EventTimeWatermarkExec physical operator in a streaming query plan.
operatorToWatermarkMap
is used when WatermarkTracker
is requested to updateWatermark.
Logging¶
Enable ALL
logging level for org.apache.spark.sql.execution.streaming.WatermarkTracker
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.sql.execution.streaming.WatermarkTracker=ALL
Refer to Logging.