EventTimeWatermark Unary Logical Operator¶
EventTimeWatermark
is a unary logical operator that is <
Note
A unary logical operator (UnaryNode
) is a logical operator with a single <
Read up on https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-LogicalPlan.html[UnaryNode] (and logical operators in general) in https://bit.ly/spark-sql-internals[The Internals of Spark SQL] book.
When requested for the <
EventTimeWatermark
is resolved (planned) to EventTimeWatermarkExec physical operator in StatefulAggregationStrategy execution planning strategy.
[NOTE]¶
EliminateEventTimeWatermark
logical optimization rule (i.e. Rule[LogicalPlan]
) removes EventTimeWatermark
logical operator from a logical plan if the <
val logs = spark.
read. // <-- batch non-streaming query that makes `EliminateEventTimeWatermark` rule applicable
format("text").
load("logs")
// logs is a batch Dataset
assert(!logs.isStreaming)
val q = logs.
withWatermark(eventTime = "timestamp", delayThreshold = "30 seconds") // <-- creates EventTimeWatermark
scala> println(q.queryExecution.logical.numberedTreeString) // <-- no EventTimeWatermark as it was removed immediately
00 Relation[value#0] text
¶
val logs = spark.
read. // <-- batch non-streaming query that makes `EliminateEventTimeWatermark` rule applicable
format("text").
load("logs")
// logs is a batch Dataset
assert(!logs.isStreaming)
val q = logs.
withWatermark(eventTime = "timestamp", delayThreshold = "30 seconds") // <-- creates EventTimeWatermark
scala> println(q.queryExecution.logical.numberedTreeString) // <-- no EventTimeWatermark as it was removed immediately
00 Relation[value#0] text
=== [[creating-instance]] Creating EventTimeWatermark Instance
EventTimeWatermark
takes the following to be created:
- [[eventTime]] Watermark column (
Attribute
) - [[delay]] Watermark delay (
CalendarInterval
) - [[child]] Child logical operator (
LogicalPlan
)
=== [[output]] Output Schema -- output
Property
[source, scala]¶
output: Seq[Attribute]¶
NOTE: output
is part of the QueryPlan
Contract to describe the attributes of (the schema of) the output.
output
finds <Metadata
of the column with <
output
removes <
[source, scala]¶
// FIXME How to access/show the eventTime column with the metadata updated to include spark.watermarkDelayMs? import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark val etw = q.queryExecution.logical.asInstanceOf[EventTimeWatermark] scala> etw.output.toStructType.printTreeString root |-- timestamp: timestamp (nullable = true) |-- value: long (nullable = true)
=== [[watermarkDelayMs]][[delayKey]] Watermark Metadata (Marker) -- spark.watermarkDelayMs
Metadata Key
spark.watermarkDelayMs
metadata key is used to mark one of the <
=== [[getDelayMs]] Converting Human-Friendly CalendarInterval to Milliseconds -- getDelayMs
Object Method
[source, scala]¶
getDelayMs( delay: CalendarInterval): Long
getDelayMs
...FIXME
NOTE: getDelayMs
is used when...FIXME