Skip to content

EventTimeWatermark Logical Operator

EventTimeWatermark is a unary logical operator (Spark SQL) that represents Dataset.withWatermark operator (in a logical query plan of a streaming query).

EventTimeWatermark marks a user-specified column as holding the event time for a row.

Creating Instance

EventTimeWatermark takes the following to be created:

EventTimeWatermark is created when:

User-Defined Event-Time Watermark Column

EventTimeWatermark is given an event-time Attribute (Spark SQL) when created.

The Attribute is an UnresolvedAttribute from Dataset.withWatermark operator (that is the target of and can immediately be removed using EliminateEventTimeWatermark logical optimization).

The event time column has to be defined on a window or a timestamp and so the data type of an event time column can be as follows:

  • StructType with end field of TimestampType type (for windowed aggregation)
  • TimestampType

EventTimeWatermark is used when:

Logical Optimizations

EliminateEventTimeWatermark

EliminateEventTimeWatermark logical optimization removes EventTimeWatermark logical operator from a logical plan if the child logical operator is not streaming (i.e., when Dataset.withWatermark operator is used in a batch query).

val logs = spark.
  read. // <-- batch non-streaming query that makes `EliminateEventTimeWatermark` rule applicable
  format("text").
  load("logs")

// logs is a batch Dataset
assert(!logs.isStreaming)

val q = logs.
  withWatermark(eventTime = "timestamp", delayThreshold = "30 seconds") // <-- creates EventTimeWatermark
scala> println(q.queryExecution.logical.numberedTreeString) // <-- no EventTimeWatermark as it was removed immediately
00 Relation[value#0] text

PushPredicateThroughNonJoin

PushPredicateThroughNonJoin can optimize streaming queries with EventTimeWatermark (and push predicates down if they don't reference the eventTime column).

Execution Planning

EventTimeWatermark is planned as EventTimeWatermarkExec physical operator by StatefulAggregationStrategy execution planning strategy.

Output Schema

output: Seq[Attribute]

output is part of the QueryPlan (Spark SQL) abstraction.


When requested for the output attributes, EventTimeWatermark logical operator scans the output attributes of the child logical operator to find the matching attribute based on the eventTime attribute and adds spark.watermarkDelayMs metadata key with the watermark delay interval (converted to milliseconds).


output finds the eventTime column in the output schema of the child logical operator and updates the Metadata of the column with spark.watermarkDelayMs key and the milliseconds for the watermark delay.

output removes spark.watermarkDelayMs key from the other columns (if there is any)

// FIXME How to access/show the eventTime column with the metadata updated to include spark.watermarkDelayMs?
import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
val etw = q.queryExecution.logical.asInstanceOf[EventTimeWatermark]
scala> etw.output.toStructType.printTreeString
root
 |-- timestamp: timestamp (nullable = true)
 |-- value: long (nullable = true)

Watermark Delay Metadata Marker

spark.watermarkDelayMs metadata key is used to mark one of the output attributes as the watermark attribute (eventTime watermark).