Streaming Aggregation¶
In Spark Structured Streaming, Streaming Aggregation is an operation in a streaming query that was described (build) using the following high-level streaming operators:
-
Dataset.groupBy,
Dataset.rollup
,Dataset.cube
(that all create aRelationalGroupedDataset
(Spark SQL)) -
Dataset.groupByKey (that creates a
KeyValueGroupedDataset
(Spark SQL)) -
SQL's
GROUP BY
clause (includingWITH CUBE
andWITH ROLLUP
(Spark SQL))
Streaming aggregation is part of Stateful Stream Processing.
IncrementalExecution¶
Under the covers, these high-level operators create logical query plans with Aggregate
(Spark SQL) logical operators.
Spark Structured Streaming uses IncrementalExecution for planning streaming queries for execution.
At query planning, IncrementalExecution
uses the StatefulAggregationStrategy execution planning strategy for planning streaming aggregations (Aggregate
unary logical operators) as pairs of StateStoreRestoreExec and StateStoreSaveExec physical operators.
Demo¶
// input data from a data source
// it's rate data source
// but that does not really matter
// We need a streaming Dataset
val input = spark
.readStream
.format("rate")
.load
// Streaming aggregation with groupBy
val counts = input
.groupBy($"value" % 2)
.count
counts.explain(extended = true)
/**
== Parsed Logical Plan ==
'Aggregate [('value % 2)], [('value % 2) AS (value % 2)#23, count(1) AS count#22L]
+- StreamingRelationV2 org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@7879348, rate, [timestamp#15, value#16L]
== Analyzed Logical Plan ==
(value % 2): bigint, count: bigint
Aggregate [(value#16L % cast(2 as bigint))], [(value#16L % cast(2 as bigint)) AS (value % 2)#23L, count(1) AS count#22L]
+- StreamingRelationV2 org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@7879348, rate, [timestamp#15, value#16L]
== Optimized Logical Plan ==
Aggregate [(value#16L % 2)], [(value#16L % 2) AS (value % 2)#23L, count(1) AS count#22L]
+- Project [value#16L]
+- StreamingRelationV2 org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@7879348, rate, [timestamp#15, value#16L]
== Physical Plan ==
*(4) HashAggregate(keys=[(value#16L % 2)#27L], functions=[count(1)], output=[(value % 2)#23L, count#22L])
+- StateStoreSave [(value#16L % 2)#27L], state info [ checkpoint = <unknown>, runId = 8c0ae2be-5eaa-4038-bc29-a176abfaf885, opId = 0, ver = 0, numPartitions = 200], Append, 0, 2
+- *(3) HashAggregate(keys=[(value#16L % 2)#27L], functions=[merge_count(1)], output=[(value#16L % 2)#27L, count#29L])
+- StateStoreRestore [(value#16L % 2)#27L], state info [ checkpoint = <unknown>, runId = 8c0ae2be-5eaa-4038-bc29-a176abfaf885, opId = 0, ver = 0, numPartitions = 200], 2
+- *(2) HashAggregate(keys=[(value#16L % 2)#27L], functions=[merge_count(1)], output=[(value#16L % 2)#27L, count#29L])
+- Exchange hashpartitioning((value#16L % 2)#27L, 200)
+- *(1) HashAggregate(keys=[(value#16L % 2) AS (value#16L % 2)#27L], functions=[partial_count(1)], output=[(value#16L % 2)#27L, count#29L])
+- *(1) Project [value#16L]
+- StreamingRelation rate, [timestamp#15, value#16L]
*/
More Demos¶
Learn more in the following demos: