WatermarkSupport Unary Physical Operators

WatermarkSupport is the <> of unary physical operators (UnaryExecNode) with support for streaming event-time watermark.


Watermark (aka "allowed lateness") is a moving threshold of event time and specifies what data to consider for aggregations, i.e. the threshold of late data so the engine can automatically drop incoming late data given event time and clean up old state accordingly.

Read the official documentation of Spark in[Handling Late Data and Watermarking].

[[properties]] .WatermarkSupport's (Lazily-Initialized) Properties [cols="1,3",options="header",width="100%"] |=== | Property | Description

| [[watermarkExpression]] watermarkExpression a| Optional Catalyst expression that matches rows older than the event time watermark.


Use withWatermark operator to specify streaming watermark.

When initialized, watermarkExpression finds spark.watermarkDelayMs watermark attribute in the child output's metadata.

If found, watermarkExpression creates evictionExpression with the watermark attribute that is less than or equal <>.

The watermark attribute may be of type StructType. If it is, watermarkExpression uses the first field as the watermark.

watermarkExpression prints out the following INFO message to the logs when spark.watermarkDelayMs watermark attribute is found.

INFO [physicalOperator]Exec: Filtering state store on: [evictionExpression]

NOTE: physicalOperator can be FlatMapGroupsWithStateExec,[StateStoreSaveExec] or physical-operators/[StreamingDeduplicateExec].

TIP: Enable INFO logging level for one of the stateful physical operators to see the INFO message in the logs.

| [[watermarkPredicateForData]] watermarkPredicateForData | Optional Predicate that uses <> and the child output to match rows older than the event-time watermark

| [[watermarkPredicateForKeys]] watermarkPredicateForKeys | Optional Predicate that uses <> to match rows older than the event time watermark. |===

=== [[contract]] WatermarkSupport Contract

[source, scala]

package org.apache.spark.sql.execution.streaming

trait WatermarkSupport extends UnaryExecNode { // only required methods that have no implementation def eventTimeWatermark: Option[Long] def keyExpressions: Seq[Attribute] }

.WatermarkSupport Contract [cols="1,2",options="header",width="100%"] |=== | Method | Description

| [[eventTimeWatermark]] eventTimeWatermark | Used mainly in <> to create a LessThanOrEqual Catalyst binary expression that matches rows older than the watermark.

| [[keyExpressions]] keyExpressions | Grouping keys (in FlatMapGroupsWithStateExec), duplicate keys (in physical-operators/[StreamingDeduplicateExec]) or key attributes (in[StateStoreSaveExec]) with at most one that may have spark.watermarkDelayMs watermark attribute in metadata

Used in <> to create a Predicate to match rows older than the event time watermark.

Used also when[StateStoreSaveExec] and physical-operators/[StreamingDeduplicateExec] physical operators are executed. |===

=== [[removeKeysOlderThanWatermark]][[removeKeysOlderThanWatermark-StateStore]] Removing Keys From StateStore Older Than Watermark -- removeKeysOlderThanWatermark Method

[source, scala]

removeKeysOlderThanWatermark(store: StateStore): Unit

removeKeysOlderThanWatermark requests the input store for all rows.

removeKeysOlderThanWatermark then uses watermarkPredicateForKeys to remove matching rows from the store.

removeKeysOlderThanWatermark is used when StreamingDeduplicateExec physical operator is requested to execute.

=== [[removeKeysOlderThanWatermark-StreamingAggregationStateManager-store]] removeKeysOlderThanWatermark Method

[source, scala]

removeKeysOlderThanWatermark( storeManager: StreamingAggregationStateManager, store: StateStore): Unit


NOTE: removeKeysOlderThanWatermark is used exclusively when StateStoreSaveExec physical operator is requested to <>.