EventTimeWatermark Logical Operator¶
EventTimeWatermark
is a unary logical operator (Spark SQL) that represents Dataset.withWatermark operator (in a logical query plan of a streaming query).
EventTimeWatermark
marks a user-specified column as holding the event time for a row.
Creating Instance¶
EventTimeWatermark
takes the following to be created:
- User-Defined Event-Time Watermark Column
- Watermark Delay Threshold
- Child
LogicalPlan
(Spark SQL)
EventTimeWatermark
is created when:
- Dataset.withWatermark operator is used
User-Defined Event-Time Watermark Column¶
EventTimeWatermark
is given an event-time Attribute
(Spark SQL) when created.
The Attribute
is an UnresolvedAttribute
from Dataset.withWatermark operator (that is the target of and can immediately be removed using EliminateEventTimeWatermark logical optimization).
The event time column has to be defined on a window or a timestamp and so the data type of an event time column can be as follows:
StructType
withend
field ofTimestampType
type (for windowed aggregation)TimestampType
EventTimeWatermark
is used when:
ProgressReporter
is requested to extract execution statistics (that addswatermark
entry with the batchWatermarkMs of the OffsetSeqMetadata of a micro-batch)
Logical Optimizations¶
EliminateEventTimeWatermark¶
EliminateEventTimeWatermark
logical optimization removes EventTimeWatermark
logical operator from a logical plan if the child logical operator is not streaming (i.e., when Dataset.withWatermark operator is used in a batch query).
val logs = spark.
read. // <-- batch non-streaming query that makes `EliminateEventTimeWatermark` rule applicable
format("text").
load("logs")
// logs is a batch Dataset
assert(!logs.isStreaming)
val q = logs.
withWatermark(eventTime = "timestamp", delayThreshold = "30 seconds") // <-- creates EventTimeWatermark
scala> println(q.queryExecution.logical.numberedTreeString) // <-- no EventTimeWatermark as it was removed immediately
00 Relation[value#0] text
PushPredicateThroughNonJoin¶
PushPredicateThroughNonJoin
can optimize streaming queries with EventTimeWatermark
(and push predicates down if they don't reference the eventTime column).
Execution Planning¶
EventTimeWatermark
is planned as EventTimeWatermarkExec physical operator by StatefulAggregationStrategy execution planning strategy.
Output Schema¶
output: Seq[Attribute]
output
is part of the QueryPlan
(Spark SQL) abstraction.
When requested for the output attributes, EventTimeWatermark
logical operator scans the output attributes of the child logical operator to find the matching attribute based on the eventTime attribute and adds spark.watermarkDelayMs
metadata key with the watermark delay interval (converted to milliseconds).
output
finds the eventTime column in the output schema of the child logical operator and updates the Metadata
of the column with spark.watermarkDelayMs key and the milliseconds for the watermark delay.
output
removes spark.watermarkDelayMs key from the other columns (if there is any)
// FIXME How to access/show the eventTime column with the metadata updated to include spark.watermarkDelayMs?
import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
val etw = q.queryExecution.logical.asInstanceOf[EventTimeWatermark]
scala> etw.output.toStructType.printTreeString
root
|-- timestamp: timestamp (nullable = true)
|-- value: long (nullable = true)
Watermark Delay Metadata Marker¶
spark.watermarkDelayMs
metadata key is used to mark one of the output attributes as the watermark attribute (eventTime watermark).