WindowExec Unary Physical Operator¶
WindowExec
is a WindowExecBase unary physical operator for window function execution.
WindowExec
represents Window unary logical operator at execution.
Creating Instance¶
WindowExec
takes the following to be created:
- Window NamedExpression
- Partition Specification (Expressions)
- SortOrders
- Child SparkPlan
WindowExec
is created when:
- Window execution planning strategy plans a Window unary logical operator with a WindowFunction or an AggregateFunction
Configuration Properties¶
WindowExec
uses the following configuration properties when executed.
spark.sql.windowExec.buffer.in.memory.threshold¶
spark.sql.windowExec.buffer.in.memory.threshold
spark.sql.windowExec.buffer.spill.threshold¶
spark.sql.windowExec.buffer.spill.threshold
Executing Physical Operator¶
doExecute(): RDD[InternalRow]
doExecute
is part of the SparkPlan abstraction.
doExecute
requests the child physical operator to execute and maps over InternalRow
s in partitions (using RDD.mapPartitions
operator).
Note
When executed, doExecute
creates a new MapPartitionsRDD
with the RDD[InternalRow]
of the child physical operator.
scala> :type we
org.apache.spark.sql.execution.window.WindowExec
val windowRDD = we.execute
scala> :type windowRDD
org.apache.spark.rdd.RDD[org.apache.spark.sql.catalyst.InternalRow]
scala> println(windowRDD.toDebugString)
(200) MapPartitionsRDD[5] at execute at <console>:35 []
| MapPartitionsRDD[4] at execute at <console>:35 []
| ShuffledRowRDD[3] at execute at <console>:35 []
+-(7) MapPartitionsRDD[2] at execute at <console>:35 []
| MapPartitionsRDD[1] at execute at <console>:35 []
| ParallelCollectionRDD[0] at execute at <console>:35 []
doExecute
creates an Iterator[InternalRow]
(of UnsafeRow exactly).
Demo¶
// arguably the most trivial example
// just a dataset of 3 rows per group
// to demo how partitions and frames work
// note the rows per groups are not consecutive (in the middle)
val metrics = Seq(
(0, 0, 0), (1, 0, 1), (2, 5, 2), (3, 0, 3), (4, 0, 1), (5, 5, 3), (6, 5, 0)
).toDF("id", "device", "level")
scala> metrics.show
+---+------+-----+
| id|device|level|
+---+------+-----+
| 0| 0| 0|
| 1| 0| 1|
| 2| 5| 2| // <-- this row for device 5 is among the rows of device 0
| 3| 0| 3| // <-- as above but for device 0
| 4| 0| 1| // <-- almost as above but there is a group of two rows for device 0
| 5| 5| 3|
| 6| 5| 0|
+---+------+-----+
// create windows of rows to use window aggregate function over every window
import org.apache.spark.sql.expressions.Window
val rangeWithTwoDevicesById = Window.
partitionBy('device).
orderBy('id).
rangeBetween(start = -1, end = Window.currentRow) // <-- demo rangeBetween first
val sumOverRange = metrics.withColumn("sum", sum('level) over rangeWithTwoDevicesById)
// Logical plan with Window unary logical operator
val optimizedPlan = sumOverRange.queryExecution.optimizedPlan
scala> println(optimizedPlan)
Window [sum(cast(level#9 as bigint)) windowspecdefinition(device#8, id#7 ASC NULLS FIRST, RANGE BETWEEN 1 PRECEDING AND CURRENT ROW) AS sum#15L], [device#8], [id#7 ASC NULLS FIRST]
+- LocalRelation [id#7, device#8, level#9]
// Physical plan with WindowExec unary physical operator (shown as Window)
scala> sumOverRange.explain
== Physical Plan ==
Window [sum(cast(level#9 as bigint)) windowspecdefinition(device#8, id#7 ASC NULLS FIRST, RANGE BETWEEN 1 PRECEDING AND CURRENT ROW) AS sum#15L], [device#8], [id#7 ASC NULLS FIRST]
+- *Sort [device#8 ASC NULLS FIRST, id#7 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(device#8, 200)
+- LocalTableScan [id#7, device#8, level#9]
// Going fairly low-level...you've been warned
val plan = sumOverRange.queryExecution.executedPlan
import org.apache.spark.sql.execution.window.WindowExec
val we = plan.asInstanceOf[WindowExec]
val windowRDD = we.execute()
scala> :type windowRDD
org.apache.spark.rdd.RDD[org.apache.spark.sql.catalyst.InternalRow]
scala> windowRDD.toDebugString
res0: String =
(200) MapPartitionsRDD[5] at execute at <console>:35 []
| MapPartitionsRDD[4] at execute at <console>:35 []
| ShuffledRowRDD[3] at execute at <console>:35 []
+-(7) MapPartitionsRDD[2] at execute at <console>:35 []
| MapPartitionsRDD[1] at execute at <console>:35 []
| ParallelCollectionRDD[0] at execute at <console>:35 []
// no computation on the source dataset has really occurred
// Let's trigger a RDD action
scala> windowRDD.first
res0: org.apache.spark.sql.catalyst.InternalRow = [0,2,5,2,2]
scala> windowRDD.foreach(println)
[0,2,5,2,2]
[0,0,0,0,0]
[0,5,5,3,3]
[0,6,5,0,3]
[0,1,0,1,1]
[0,3,0,3,3]
[0,4,0,1,4]
scala> sumOverRange.show
+---+------+-----+---+
| id|device|level|sum|
+---+------+-----+---+
| 2| 5| 2| 2|
| 5| 5| 3| 3|
| 6| 5| 0| 3|
| 0| 0| 0| 0|
| 1| 0| 1| 1|
| 3| 0| 3| 3|
| 4| 0| 1| 4|
+---+------+-----+---+
// use rowsBetween
val rowsWithTwoDevicesById = Window.
partitionBy('device).
orderBy('id).
rowsBetween(start = -1, end = Window.currentRow)
val sumOverRows = metrics.withColumn("sum", sum('level) over rowsWithTwoDevicesById)
// let's see the result first to have them close
// and compare row- vs range-based windows
scala> sumOverRows.show
+---+------+-----+---+
| id|device|level|sum|
+---+------+-----+---+
| 2| 5| 2| 2|
| 5| 5| 3| 5| <-- a difference
| 6| 5| 0| 3|
| 0| 0| 0| 0|
| 1| 0| 1| 1|
| 3| 0| 3| 4| <-- another difference
| 4| 0| 1| 4|
+---+------+-----+---+
val rowsOptimizedPlan = sumOverRows.queryExecution.optimizedPlan
scala> println(rowsOptimizedPlan)
Window [sum(cast(level#901 as bigint)) windowspecdefinition(device#900, id#899 ASC NULLS FIRST, ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS sum#1458L], [device#900], [id#899 ASC NULLS FIRST]
+- LocalRelation [id#899, device#900, level#901]
scala> sumOverRows.explain
== Physical Plan ==
Window [sum(cast(level#901 as bigint)) windowspecdefinition(device#900, id#899 ASC NULLS FIRST, ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS sum#1458L], [device#900], [id#899 ASC NULLS FIRST]
+- *Sort [device#900 ASC NULLS FIRST, id#899 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(device#900, 200)
+- LocalTableScan [id#899, device#900, level#901]
// a more involved example
val dataset = spark.range(start = 0, end = 13, step = 1, numPartitions = 4)
import org.apache.spark.sql.expressions.Window
val groupsOrderById = Window.partitionBy('group).rangeBetween(-2, Window.currentRow).orderBy('id)
val query = dataset.
withColumn("group", 'id % 4).
select('*, sum('id) over groupsOrderById as "sum")
scala> query.explain
== Physical Plan ==
Window [sum(id#25L) windowspecdefinition(group#244L, id#25L ASC NULLS FIRST, RANGE BETWEEN 2 PRECEDING AND CURRENT ROW) AS sum#249L], [group#244L], [id#25L ASC NULLS FIRST]
+- *Sort [group#244L ASC NULLS FIRST, id#25L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(group#244L, 200)
+- *Project [id#25L, (id#25L % 4) AS group#244L]
+- *Range (0, 13, step=1, splits=4)
val plan = query.queryExecution.executedPlan
import org.apache.spark.sql.execution.window.WindowExec
val we = plan.asInstanceOf[WindowExec]
// the whole schema is as follows
val schema = query.queryExecution.executedPlan.output.toStructType
scala> println(schema.treeString)
root
|-- id: long (nullable = false)
|-- group: long (nullable = true)
|-- sum: long (nullable = true)
// Let's see ourselves how the schema is made up of
scala> :type we
org.apache.spark.sql.execution.window.WindowExec
// child's output
scala> println(we.child.output.toStructType.treeString)
root
|-- id: long (nullable = false)
|-- group: long (nullable = true)
// window expressions' output
val weExprSchema = we.windowExpression.map(_.toAttribute).toStructType
scala> println(weExprSchema.treeString)
root
|-- sum: long (nullable = true)
ROWS BETWEEN¶
val vs = Seq(
(1,0), (1,1), (1,3), (2, 0), (2,4)).toDF("gid", "v")
scala> vs.show
+---+---+
|gid| v|
+---+---+
| 1| 0|
| 1| 1|
| 1| 3|
| 2| 0|
| 2| 4|
+---+---+
import org.apache.spark.sql.expressions.Window
val byRowsBetween = Window
.partitionBy('gid)
.orderBy('v)
.rowsBetween(-1, Window.currentRow)
val q = vs.withColumn("vs", collect_list('v).over(byRowsBetween))
scala> q.show
+---+---+---------+
|gid| v| vs|
+---+---+---------+
| 1| 0|[0, 1, 3]|
| 1| 1|[0, 1, 3]|
| 1| 3|[0, 1, 3]|
| 2| 0| [0, 4]|
| 2| 4| [0, 4]|
+---+---+---------+
RANGE BETWEEN¶
val vs = Seq(
(1,0), (1,1), (1,3), (2, 0), (2,4)).toDF("gid", "v")
scala> vs.show
+---+---+
|gid| v|
+---+---+
| 1| 0|
| 1| 1|
| 1| 3|
| 2| 0|
| 2| 4|
+---+---+
import org.apache.spark.sql.expressions.Window
val byRangeBetween = Window
.partitionBy('gid)
.orderBy('v)
.rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)
val q = vs.withColumn("vs", collect_list('v).over(byRangeBetween))
scala> q.show
+---+---+---------+
|gid| v| vs|
+---+---+---------+
| 1| 0|[0, 1, 3]|
| 1| 1|[0, 1, 3]|
| 1| 3|[0, 1, 3]|
| 2| 0| [0, 4]|
| 2| 4| [0, 4]|
+---+---+---------+
Logging¶
Enable ALL
logging level for org.apache.spark.sql.execution.WindowExec
logger to see what happens inside.
Add the following line to conf/log4j2.properties
:
log4j.logger.org.apache.spark.sql.execution.WindowExec=ALL
Refer to Logging.