WatermarkSupport Physical Operators¶
WatermarkSupport
is an extension of the SparkPlan
(Spark SQL) abstraction for physical operators with support for streaming watermark.
Contract¶
Child Physical Operator¶
child: SparkPlan
Used when:
WatermarkSupport
is requested for the watermark expression and watermark predicate for data
Event-Time Watermark¶
eventTimeWatermark: Option[Long]
Current value of watermark
Used when:
WatermarkSupport
is requested for the watermark expression
Key Expressions¶
keyExpressions: Seq[Attribute]
Grouping keys (in FlatMapGroupsWithStateExec), duplicate keys (in StreamingDeduplicateExec) or key attributes (in StateStoreSaveExec) with at most one that may have spark.watermarkDelayMs watermark attribute in metadata
Used when:
WatermarkSupport
is requested for the watermark predicate for keys (to match rows older than the event time watermark)
Implementations¶
- FlatMapGroupsWithStateExec
- SessionWindowStateStoreRestoreExec
- SessionWindowStateStoreSaveExec
- StateStoreSaveExec
- StreamingDeduplicateExec
Watermark Expression¶
watermarkExpression: Option[Expression]
Lazy Value
watermarkExpression
is a Scala lazy value to guarantee that the code to initialize it is executed once only (when accessed for the first time) and the computed value never changes afterwards.
Learn more in the Scala Language Specification.
watermarkExpression
is a LessThanOrEqual
(Spark SQL) expression.
Note
Use Dataset.withWatermark operator to specify a watermark expression.
watermarkExpression
creates a watermark expression for the following:
- child operator's output expressions (from the output schema of the child operator) with spark.watermarkDelayMs metadata key
- eventTimeWatermark
watermarkExpression
is used when:
WatermarkSupport
is requested for the watermark predicates (for keys and data)
Watermark Predicates¶
For Keys¶
watermarkPredicateForKeys: Option[BasePredicate]
Lazy Value
watermarkPredicateForKeys
is a Scala lazy value to guarantee that the code to initialize it is executed once only (when accessed for the first time) and the computed value never changes afterwards.
Learn more in the Scala Language Specification.
watermarkPredicateForKeys
is a BasePredicate
(Spark SQL) for the watermark expression (if defined) and the key expressions with spark.watermarkDelayMs metadata key.
watermarkPredicateForKeys
is used when:
WatermarkSupport
is requested to removeKeysOlderThanWatermarkStateStoreSaveExec
is executed (with Append output mode)
For Data¶
watermarkPredicateForData: Option[BasePredicate]
Lazy Value
watermarkPredicateForData
is a Scala lazy value to guarantee that the code to initialize it is executed once only (when accessed for the first time) and the computed value never changes afterwards.
Learn more in the Scala Language Specification.
watermarkPredicateForData
is a BasePredicate
(Spark SQL) for the watermark expression (if defined).
Note
watermarkPredicateForData
is created only when the watermark expression is defined.
watermarkPredicateForData
is used when:
FlatMapGroupsWithStateExec
is requested to processDataWithPartitionStateStoreSaveExec
is executed (with Append output mode)SessionWindowStateStoreRestoreExec
is executedSessionWindowStateStoreSaveExec
is executedStreamingDeduplicateExec
is executed
removeKeysOlderThanWatermark¶
removeKeysOlderThanWatermark(
store: StateStore): Unit
removeKeysOlderThanWatermark(
storeManager: StreamingAggregationStateManager,
store: StateStore): Unit
removeKeysOlderThanWatermark
...FIXME
removeKeysOlderThanWatermark
is used when:
StreamingDeduplicateExec
is executed (just before finishing up)StateStoreSaveExec
is executed (just before finishing up in Update output mode)
watermarkExpression¶
watermarkExpression(
optionalWatermarkExpression: Option[Expression],
optionalWatermarkMs: Option[Long]): Option[Expression]
watermarkExpression
is None
(undefined) when neither the given optionalWatermarkExpression
nor the optionalWatermarkMs
is undefined.
In other words, watermarkExpression
creates an Expression
(LessThanOrEqual, precisely) when both optionalWatermarkExpression
and optionalWatermarkMs
are specified.
watermarkExpression
takes the watermark attribute from the given optionalWatermarkExpression
.
For the watermark attribute of StructType
(which means it is a window), watermarkExpression
uses the end of the window to LessThanOrEqual
with the optionalWatermarkMs
.
For the watermark attribute of non-StructType
, watermarkExpression
uses it to LessThanOrEqual
with the optionalWatermarkMs
.
FIXME Demo
import org.apache.spark.sql.execution.streaming.WatermarkSupport
import org.apache.spark.sql.functions.window
val w = window(timeColumn = $"time", windowDuration = "5 seconds")
val optionalWatermarkExpression = Some(w.expr)
val optionalWatermarkMs = Some(5L)
WatermarkSupport.watermarkExpression(optionalWatermarkExpression, optionalWatermarkMs)
// FIXME Resolve Catalyst expressions
// org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to dataType on unresolved object
// at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:137)
// at org.apache.spark.sql.catalyst.expressions.TimeWindow.dataType(TimeWindow.scala:101)
// at org.apache.spark.sql.catalyst.expressions.Alias.dataType(namedExpressions.scala:166)
// at org.apache.spark.sql.execution.streaming.WatermarkSupport$.watermarkExpression(statefulOperators.scala:276)
// ... 52 elided
watermarkExpression
is used when:
WatermarkSupport
is requested for the watermark expressionStreamingSymmetricHashJoinExec.OneSideHashJoiner
is requested to storeAndJoinWithOtherSideStreamingSymmetricHashJoinHelper
is requested to getStateWatermarkPredicates