TimeWindow¶
TimeWindow is an Unevaluable, NonSQLExpression UnaryExpression that represents window function.
import org.apache.spark.sql.functions.window
val w = window(
'timeColumn,
windowDuration = "10 seconds",
slideDuration = "5 seconds",
startTime = "0 seconds")
scala> println(w.expr.numberedTreeString)
00 window('timeColumn, 10000000, 5000000, 0) AS window#1
01 +- window('timeColumn, 10000000, 5000000, 0)
02 +- 'timeColumn
import org.apache.spark.sql.catalyst.expressions.TimeWindow
val tw = w.expr.children.head.asInstanceOf[TimeWindow]
Creating Instance¶
TimeWindow takes the following to be created:
- Time Column (Expression)
- Window Duration
- Slide Duration
- Start Time
TimeWindow is created using apply factory method.
Creating TimeWindow¶
apply(
timeColumn: Expression,
windowDuration: String,
slideDuration: String,
startTime: String): TimeWindow
apply creates a TimeWindow (for the given timeColumn expression with the window and slide durations and start time converted to seconds).
apply is used when:
- window standard function is used
Parsing Time Interval to Microseconds¶
getIntervalInMicroSeconds(
interval: String): Long
getIntervalInMicroSeconds...FIXME
getIntervalInMicroSeconds is used when:
TimeWindowutility is used to parseExpression and apply
Analysis Phase¶
TimeWindow is resolved to Expand unary logical operator (when TimeWindowing logical evaluation rule is executed).
import java.time.LocalDateTime
import java.sql.Timestamp
val levels = Seq(
// (year, month, dayOfMonth, hour, minute, second)
((2012, 12, 12, 12, 12, 12), 5),
((2012, 12, 12, 12, 12, 14), 9),
((2012, 12, 12, 13, 13, 14), 4),
((2016, 8, 13, 0, 0, 0), 10),
((2017, 5, 27, 0, 0, 0), 15)).
map { case ((yy, mm, dd, h, m, s), a) => (LocalDateTime.of(yy, mm, dd, h, m, s), a) }.
map { case (ts, a) => (Timestamp.valueOf(ts), a) }.
toDF("time", "level")
scala> levels.show
+-------------------+-----+
| time|level|
+-------------------+-----+
|2012-12-12 12:12:12| 5|
|2012-12-12 12:12:14| 9|
|2012-12-12 13:13:14| 4|
|2016-08-13 00:00:00| 10|
|2017-05-27 00:00:00| 15|
+-------------------+-----+
val q = levels.select(window($"time", "5 seconds"))
// Before Analyzer
scala> println(q.queryExecution.logical.numberedTreeString)
00 'Project [timewindow('time, 5000000, 5000000, 0) AS window#18]
01 +- Project [_1#6 AS time#9, _2#7 AS level#10]
02 +- LocalRelation [_1#6, _2#7]
// After Analyzer
scala> println(q.queryExecution.analyzed.numberedTreeString)
00 Project [window#19 AS window#18]
01 +- Filter ((time#9 >= window#19.start) && (time#9 < window#19.end))
02 +- Expand [List(named_struct(start, ((((CEIL((cast((precisetimestamp(time#9) - 0) as double) / cast(5000000 as double))) + cast(0 as bigint)) - cast(1 as bigint)) * 5000000) + 0), end, (((((CEIL((cast((precisetimestamp(time#9) - 0) as double) / cast(5000000 as double))) + cast(0 as bigint)) - cast(1 as bigint)) * 5000000) + 0) + 5000000)), time#9, level#10), List(named_struct(start, ((((CEIL((cast((precisetimestamp(time#9) - 0) as double) / cast(5000000 as double))) + cast(1 as bigint)) - cast(1 as bigint)) * 5000000) + 0), end, (((((CEIL((cast((precisetimestamp(time#9) - 0) as double) / cast(5000000 as double))) + cast(1 as bigint)) - cast(1 as bigint)) * 5000000) + 0) + 5000000)), time#9, level#10)], [window#19, time#9, level#10]
03 +- Project [_1#6 AS time#9, _2#7 AS level#10]
04 +- LocalRelation [_1#6, _2#7]