WatermarkSupport Physical Operators¶
WatermarkSupport is an extension of the SparkPlan (Spark SQL) abstraction for physical operators with support for streaming watermark.
Contract¶
Child Physical Operator¶
child: SparkPlan
Used when:
WatermarkSupportis requested for the watermark expression and watermark predicate for data
Event-Time Watermark¶
eventTimeWatermark: Option[Long]
Current value of watermark
Used when:
WatermarkSupportis requested for the watermark expression
Key Expressions¶
keyExpressions: Seq[Attribute]
Grouping keys (in FlatMapGroupsWithStateExec), duplicate keys (in StreamingDeduplicateExec) or key attributes (in StateStoreSaveExec) with at most one that may have spark.watermarkDelayMs watermark attribute in metadata
Used when:
WatermarkSupportis requested for the watermark predicate for keys (to match rows older than the event time watermark)
Implementations¶
- FlatMapGroupsWithStateExec
- SessionWindowStateStoreRestoreExec
- SessionWindowStateStoreSaveExec
- StateStoreSaveExec
- StreamingDeduplicateExec
Watermark Expression¶
watermarkExpression: Option[Expression]
Lazy Value
watermarkExpression is a Scala lazy value to guarantee that the code to initialize it is executed once only (when accessed for the first time) and the computed value never changes afterwards.
Learn more in the Scala Language Specification.
watermarkExpression is a LessThanOrEqual (Spark SQL) expression.
Note
Use Dataset.withWatermark operator to specify a watermark expression.
watermarkExpression creates a watermark expression for the following:
- child operator's output expressions (from the output schema of the child operator) with spark.watermarkDelayMs metadata key
- eventTimeWatermark
watermarkExpression is used when:
WatermarkSupportis requested for the watermark predicates (for keys and data)
Watermark Predicates¶
For Keys¶
watermarkPredicateForKeys: Option[BasePredicate]
Lazy Value
watermarkPredicateForKeys is a Scala lazy value to guarantee that the code to initialize it is executed once only (when accessed for the first time) and the computed value never changes afterwards.
Learn more in the Scala Language Specification.
watermarkPredicateForKeys is a BasePredicate (Spark SQL) for the watermark expression (if defined) and the key expressions with spark.watermarkDelayMs metadata key.
watermarkPredicateForKeys is used when:
WatermarkSupportis requested to removeKeysOlderThanWatermarkStateStoreSaveExecis executed (with Append output mode)
For Data¶
watermarkPredicateForData: Option[BasePredicate]
Lazy Value
watermarkPredicateForData is a Scala lazy value to guarantee that the code to initialize it is executed once only (when accessed for the first time) and the computed value never changes afterwards.
Learn more in the Scala Language Specification.
watermarkPredicateForData is a BasePredicate (Spark SQL) for the watermark expression (if defined).
Note
watermarkPredicateForData is created only when the watermark expression is defined.
watermarkPredicateForData is used when:
FlatMapGroupsWithStateExecis requested to processDataWithPartitionStateStoreSaveExecis executed (with Append output mode)SessionWindowStateStoreRestoreExecis executedSessionWindowStateStoreSaveExecis executedStreamingDeduplicateExecis executed
removeKeysOlderThanWatermark¶
removeKeysOlderThanWatermark(
store: StateStore): Unit
removeKeysOlderThanWatermark(
storeManager: StreamingAggregationStateManager,
store: StateStore): Unit
removeKeysOlderThanWatermark...FIXME
removeKeysOlderThanWatermark is used when:
StreamingDeduplicateExecis executed (just before finishing up)StateStoreSaveExecis executed (just before finishing up in Update output mode)
watermarkExpression¶
watermarkExpression(
optionalWatermarkExpression: Option[Expression],
optionalWatermarkMs: Option[Long]): Option[Expression]
watermarkExpression is None (undefined) when neither the given optionalWatermarkExpression nor the optionalWatermarkMs is undefined.
In other words, watermarkExpression creates an Expression (LessThanOrEqual, precisely) when both optionalWatermarkExpression and optionalWatermarkMs are specified.
watermarkExpression takes the watermark attribute from the given optionalWatermarkExpression.
For the watermark attribute of StructType (which means it is a window), watermarkExpression uses the end of the window to LessThanOrEqual with the optionalWatermarkMs.
For the watermark attribute of non-StructType, watermarkExpression uses it to LessThanOrEqual with the optionalWatermarkMs.
FIXME Demo
import org.apache.spark.sql.execution.streaming.WatermarkSupport
import org.apache.spark.sql.functions.window
val w = window(timeColumn = $"time", windowDuration = "5 seconds")
val optionalWatermarkExpression = Some(w.expr)
val optionalWatermarkMs = Some(5L)
WatermarkSupport.watermarkExpression(optionalWatermarkExpression, optionalWatermarkMs)
// FIXME Resolve Catalyst expressions
// org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to dataType on unresolved object
// at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:137)
// at org.apache.spark.sql.catalyst.expressions.TimeWindow.dataType(TimeWindow.scala:101)
// at org.apache.spark.sql.catalyst.expressions.Alias.dataType(namedExpressions.scala:166)
// at org.apache.spark.sql.execution.streaming.WatermarkSupport$.watermarkExpression(statefulOperators.scala:276)
// ... 52 elided
watermarkExpression is used when:
WatermarkSupportis requested for the watermark expressionStreamingSymmetricHashJoinExec.OneSideHashJoineris requested to storeAndJoinWithOtherSideStreamingSymmetricHashJoinHelperis requested to getStateWatermarkPredicates