Skip to content

WatermarkSupport Physical Operators

WatermarkSupport is an extension of the SparkPlan (Spark SQL) abstraction for physical operators with support for streaming watermark.

Contract

Child Physical Operator

child: SparkPlan

Used when:

Event-Time Watermark

eventTimeWatermark: Option[Long]

Current value of watermark

Used when:

Key Expressions

keyExpressions: Seq[Attribute]

Grouping keys (in FlatMapGroupsWithStateExec), duplicate keys (in StreamingDeduplicateExec) or key attributes (in StateStoreSaveExec) with at most one that may have spark.watermarkDelayMs watermark attribute in metadata

Used when:

Implementations

Watermark Expression

watermarkExpression: Option[Expression]
Lazy Value

watermarkExpression is a Scala lazy value to guarantee that the code to initialize it is executed once only (when accessed for the first time) and the computed value never changes afterwards.

Learn more in the Scala Language Specification.

watermarkExpression is a LessThanOrEqual (Spark SQL) expression.

Note

Use Dataset.withWatermark operator to specify a watermark expression.

watermarkExpression creates a watermark expression for the following:


watermarkExpression is used when:

Watermark Predicates

For Keys

watermarkPredicateForKeys: Option[BasePredicate]
Lazy Value

watermarkPredicateForKeys is a Scala lazy value to guarantee that the code to initialize it is executed once only (when accessed for the first time) and the computed value never changes afterwards.

Learn more in the Scala Language Specification.

watermarkPredicateForKeys is a BasePredicate (Spark SQL) for the watermark expression (if defined) and the key expressions with spark.watermarkDelayMs metadata key.


watermarkPredicateForKeys is used when:

For Data

watermarkPredicateForData: Option[BasePredicate]
Lazy Value

watermarkPredicateForData is a Scala lazy value to guarantee that the code to initialize it is executed once only (when accessed for the first time) and the computed value never changes afterwards.

Learn more in the Scala Language Specification.

watermarkPredicateForData is a BasePredicate (Spark SQL) for the watermark expression (if defined).

Note

watermarkPredicateForData is created only when the watermark expression is defined.


watermarkPredicateForData is used when:

removeKeysOlderThanWatermark

removeKeysOlderThanWatermark(
  store: StateStore): Unit
removeKeysOlderThanWatermark(
  storeManager: StreamingAggregationStateManager,
  store: StateStore): Unit

removeKeysOlderThanWatermark...FIXME


removeKeysOlderThanWatermark is used when:

  • StreamingDeduplicateExec is executed (just before finishing up)
  • StateStoreSaveExec is executed (just before finishing up in Update output mode)

watermarkExpression

watermarkExpression(
  optionalWatermarkExpression: Option[Expression],
  optionalWatermarkMs: Option[Long]): Option[Expression]

watermarkExpression is None (undefined) when neither the given optionalWatermarkExpression nor the optionalWatermarkMs is undefined.

In other words, watermarkExpression creates an Expression (LessThanOrEqual, precisely) when both optionalWatermarkExpression and optionalWatermarkMs are specified.

watermarkExpression takes the watermark attribute from the given optionalWatermarkExpression.

For the watermark attribute of StructType (which means it is a window), watermarkExpression uses the end of the window to LessThanOrEqual with the optionalWatermarkMs.

For the watermark attribute of non-StructType, watermarkExpression uses it to LessThanOrEqual with the optionalWatermarkMs.

FIXME Demo
import org.apache.spark.sql.execution.streaming.WatermarkSupport

import org.apache.spark.sql.functions.window
val w = window(timeColumn = $"time", windowDuration = "5 seconds")

val optionalWatermarkExpression = Some(w.expr)
val optionalWatermarkMs = Some(5L)

WatermarkSupport.watermarkExpression(optionalWatermarkExpression, optionalWatermarkMs)

// FIXME Resolve Catalyst expressions
// org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to dataType on unresolved object
//   at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:137)
//   at org.apache.spark.sql.catalyst.expressions.TimeWindow.dataType(TimeWindow.scala:101)
//   at org.apache.spark.sql.catalyst.expressions.Alias.dataType(namedExpressions.scala:166)
//   at org.apache.spark.sql.execution.streaming.WatermarkSupport$.watermarkExpression(statefulOperators.scala:276)
//   ... 52 elided

watermarkExpression is used when: