Skip to content

TimeWindow

TimeWindow is an Unevaluable, NonSQLExpression UnaryExpression that represents window function.

import org.apache.spark.sql.functions.window
val w = window(
  'timeColumn,
  windowDuration = "10 seconds",
  slideDuration = "5 seconds",
  startTime = "0 seconds")
scala> println(w.expr.numberedTreeString)
00 window('timeColumn, 10000000, 5000000, 0) AS window#1
01 +- window('timeColumn, 10000000, 5000000, 0)
02    +- 'timeColumn
import org.apache.spark.sql.catalyst.expressions.TimeWindow
val tw = w.expr.children.head.asInstanceOf[TimeWindow]

Creating Instance

TimeWindow takes the following to be created:

  • Time Column (Expression)
  • Window Duration
  • Slide Duration
  • Start Time

TimeWindow is created using apply factory method.

Creating TimeWindow

apply(
  timeColumn: Expression,
  windowDuration: String,
  slideDuration: String,
  startTime: String): TimeWindow

apply creates a TimeWindow (for the given timeColumn expression with the window and slide durations and start time converted to seconds).

apply is used when:

  • window standard function is used

Parsing Time Interval to Microseconds

getIntervalInMicroSeconds(
  interval: String): Long

getIntervalInMicroSeconds...FIXME

getIntervalInMicroSeconds is used when:

Analysis Phase

TimeWindow is resolved to Expand unary logical operator (when TimeWindowing logical evaluation rule is executed).

import java.time.LocalDateTime
import java.sql.Timestamp

val levels = Seq(
  // (year, month, dayOfMonth, hour, minute, second)
  ((2012, 12, 12, 12, 12, 12), 5),
  ((2012, 12, 12, 12, 12, 14), 9),
  ((2012, 12, 12, 13, 13, 14), 4),
  ((2016, 8,  13, 0, 0, 0), 10),
  ((2017, 5,  27, 0, 0, 0), 15)).
  map { case ((yy, mm, dd, h, m, s), a) => (LocalDateTime.of(yy, mm, dd, h, m, s), a) }.
  map { case (ts, a) => (Timestamp.valueOf(ts), a) }.
  toDF("time", "level")
scala> levels.show
+-------------------+-----+
|               time|level|
+-------------------+-----+
|2012-12-12 12:12:12|    5|
|2012-12-12 12:12:14|    9|
|2012-12-12 13:13:14|    4|
|2016-08-13 00:00:00|   10|
|2017-05-27 00:00:00|   15|
+-------------------+-----+

val q = levels.select(window($"time", "5 seconds"))

// Before Analyzer
scala> println(q.queryExecution.logical.numberedTreeString)
00 'Project [timewindow('time, 5000000, 5000000, 0) AS window#18]
01 +- Project [_1#6 AS time#9, _2#7 AS level#10]
02    +- LocalRelation [_1#6, _2#7]

// After Analyzer
scala> println(q.queryExecution.analyzed.numberedTreeString)
00 Project [window#19 AS window#18]
01 +- Filter ((time#9 >= window#19.start) && (time#9 < window#19.end))
02    +- Expand [List(named_struct(start, ((((CEIL((cast((precisetimestamp(time#9) - 0) as double) / cast(5000000 as double))) + cast(0 as bigint)) - cast(1 as bigint)) * 5000000) + 0), end, (((((CEIL((cast((precisetimestamp(time#9) - 0) as double) / cast(5000000 as double))) + cast(0 as bigint)) - cast(1 as bigint)) * 5000000) + 0) + 5000000)), time#9, level#10), List(named_struct(start, ((((CEIL((cast((precisetimestamp(time#9) - 0) as double) / cast(5000000 as double))) + cast(1 as bigint)) - cast(1 as bigint)) * 5000000) + 0), end, (((((CEIL((cast((precisetimestamp(time#9) - 0) as double) / cast(5000000 as double))) + cast(1 as bigint)) - cast(1 as bigint)) * 5000000) + 0) + 5000000)), time#9, level#10)], [window#19, time#9, level#10]
03       +- Project [_1#6 AS time#9, _2#7 AS level#10]
04          +- LocalRelation [_1#6, _2#7]