EventTimeWatermark Logical Operator¶
EventTimeWatermark is a unary logical operator (Spark SQL) that represents Dataset.withWatermark operator (in a logical query plan of a streaming query).
EventTimeWatermark marks a user-specified column as holding the event time for a row.
Creating Instance¶
EventTimeWatermark takes the following to be created:
- User-Defined Event-Time Watermark Column
- Watermark Delay Threshold
- Child
LogicalPlan(Spark SQL)
EventTimeWatermark is created when:
- Dataset.withWatermark operator is used
User-Defined Event-Time Watermark Column¶
EventTimeWatermark is given an event-time Attribute (Spark SQL) when created.
The Attribute is an UnresolvedAttribute from Dataset.withWatermark operator (that is the target of and can immediately be removed using EliminateEventTimeWatermark logical optimization).
The event time column has to be defined on a window or a timestamp and so the data type of an event time column can be as follows:
StructTypewithendfield ofTimestampTypetype (for windowed aggregation)TimestampType
EventTimeWatermark is used when:
ProgressReporteris requested to extract execution statistics (that addswatermarkentry with the batchWatermarkMs of the OffsetSeqMetadata of a micro-batch)
Logical Optimizations¶
EliminateEventTimeWatermark¶
EliminateEventTimeWatermark logical optimization removes EventTimeWatermark logical operator from a logical plan if the child logical operator is not streaming (i.e., when Dataset.withWatermark operator is used in a batch query).
val logs = spark.
read. // <-- batch non-streaming query that makes `EliminateEventTimeWatermark` rule applicable
format("text").
load("logs")
// logs is a batch Dataset
assert(!logs.isStreaming)
val q = logs.
withWatermark(eventTime = "timestamp", delayThreshold = "30 seconds") // <-- creates EventTimeWatermark
scala> println(q.queryExecution.logical.numberedTreeString) // <-- no EventTimeWatermark as it was removed immediately
00 Relation[value#0] text
PushPredicateThroughNonJoin¶
PushPredicateThroughNonJoin can optimize streaming queries with EventTimeWatermark (and push predicates down if they don't reference the eventTime column).
Execution Planning¶
EventTimeWatermark is planned as EventTimeWatermarkExec physical operator by StatefulAggregationStrategy execution planning strategy.
Output Schema¶
output: Seq[Attribute]
output is part of the QueryPlan (Spark SQL) abstraction.
When requested for the output attributes, EventTimeWatermark logical operator scans the output attributes of the child logical operator to find the matching attribute based on the eventTime attribute and adds spark.watermarkDelayMs metadata key with the watermark delay interval (converted to milliseconds).
output finds the eventTime column in the output schema of the child logical operator and updates the Metadata of the column with spark.watermarkDelayMs key and the milliseconds for the watermark delay.
output removes spark.watermarkDelayMs key from the other columns (if there is any)
// FIXME How to access/show the eventTime column with the metadata updated to include spark.watermarkDelayMs?
import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
val etw = q.queryExecution.logical.asInstanceOf[EventTimeWatermark]
scala> etw.output.toStructType.printTreeString
root
|-- timestamp: timestamp (nullable = true)
|-- value: long (nullable = true)
Watermark Delay Metadata Marker¶
spark.watermarkDelayMs metadata key is used to mark one of the output attributes as the watermark attribute (eventTime watermark).