Skip to content

groupBy Operator — Streaming Aggregation

groupBy(
  cols: Column*): RelationalGroupedDataset
groupBy(
  col1: String,
  cols: String*): RelationalGroupedDataset

groupBy operator aggregates rows by zero, one or more columns.

Demo

val fromTopic1 = spark.
  readStream.
  format("kafka").
  option("subscribe", "topic1").
  option("kafka.bootstrap.servers", "localhost:9092").
  load

// extract event time et al
// time,key,value
/*
2017-08-23T00:00:00.002Z,1,now
2017-08-23T00:05:00.002Z,1,5 mins later
2017-08-23T00:09:00.002Z,1,9 mins later
2017-08-23T00:11:00.002Z,1,11 mins later
2017-08-23T01:00:00.002Z,1,1 hour later
// late event = watermark should be (1 hour - 10 minutes) already
2017-08-23T00:49:59.002Z,1,==> SHOULD NOT BE INCLUDED in aggregation as too late <==

CAUTION: FIXME SHOULD NOT BE INCLUDED is included contrary to my understanding?!
*/
val timedValues = fromTopic1.
  select('value cast "string").
  withColumn("tokens", split('value, ",")).
  withColumn("time", to_timestamp('tokens(0))).
  withColumn("key", 'tokens(1) cast "int").
  withColumn("value", 'tokens(2)).
  select("time", "key", "value")

// aggregation with watermark
val counts = timedValues.
  withWatermark("time", "10 minutes").
  groupBy("key").
  agg(collect_list('value) as "values", collect_list('time) as "times")

// Note that StatefulOperatorStateInfo is mostly generic
// since no batch-specific values are currently available
// only after the first streaming batch
scala> counts.explain
== Physical Plan ==
ObjectHashAggregate(keys=[key#27], functions=[collect_list(value#33, 0, 0), collect_list(time#22-T600000ms, 0, 0)])
+- Exchange hashpartitioning(key#27, 200)
   +- StateStoreSave [key#27], StatefulOperatorStateInfo(<unknown>,25149816-1f14-4901-af13-896286a26d42,0,0), Append, 0
      +- ObjectHashAggregate(keys=[key#27], functions=[merge_collect_list(value#33, 0, 0), merge_collect_list(time#22-T600000ms, 0, 0)])
         +- Exchange hashpartitioning(key#27, 200)
            +- StateStoreRestore [key#27], StatefulOperatorStateInfo(<unknown>,25149816-1f14-4901-af13-896286a26d42,0,0)
               +- ObjectHashAggregate(keys=[key#27], functions=[merge_collect_list(value#33, 0, 0), merge_collect_list(time#22-T600000ms, 0, 0)])
                  +- Exchange hashpartitioning(key#27, 200)
                     +- ObjectHashAggregate(keys=[key#27], functions=[partial_collect_list(value#33, 0, 0), partial_collect_list(time#22-T600000ms, 0, 0)])
                        +- EventTimeWatermark time#22: timestamp, interval 10 minutes
                           +- *Project [cast(split(cast(value#1 as string), ,)[0] as timestamp) AS time#22, cast(split(cast(value#1 as string), ,)[1] as int) AS key#27, split(cast(value#1 as string), ,)[2] AS value#33]
                              +- StreamingRelation kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]

import org.apache.spark.sql.streaming._
import scala.concurrent.duration._
val sq = counts.writeStream.
  format("console").
  option("truncate", false).
  trigger(Trigger.ProcessingTime(30.seconds)).
  outputMode(OutputMode.Update).  // <-- only Update or Complete acceptable because of groupBy aggregation
  start

// After StreamingQuery was started,
// the physical plan is complete (with batch-specific values)
scala> sq.explain
== Physical Plan ==
ObjectHashAggregate(keys=[key#27], functions=[collect_list(value#33, 0, 0), collect_list(time#22-T600000ms, 0, 0)])
+- Exchange hashpartitioning(key#27, 200)
   +- StateStoreSave [key#27], StatefulOperatorStateInfo(file:/private/var/folders/0w/kb0d3rqn4zb9fcc91pxhgn8w0000gn/T/temporary-635d6519-b6ca-4686-9b6b-5db0e83cfd51/state,855cec1c-25dc-4a86-ae54-c6cdd4ed02ec,0,0), Update, 0
      +- ObjectHashAggregate(keys=[key#27], functions=[merge_collect_list(value#33, 0, 0), merge_collect_list(time#22-T600000ms, 0, 0)])
         +- Exchange hashpartitioning(key#27, 200)
            +- StateStoreRestore [key#27], StatefulOperatorStateInfo(file:/private/var/folders/0w/kb0d3rqn4zb9fcc91pxhgn8w0000gn/T/temporary-635d6519-b6ca-4686-9b6b-5db0e83cfd51/state,855cec1c-25dc-4a86-ae54-c6cdd4ed02ec,0,0)
               +- ObjectHashAggregate(keys=[key#27], functions=[merge_collect_list(value#33, 0, 0), merge_collect_list(time#22-T600000ms, 0, 0)])
                  +- Exchange hashpartitioning(key#27, 200)
                     +- ObjectHashAggregate(keys=[key#27], functions=[partial_collect_list(value#33, 0, 0), partial_collect_list(time#22-T600000ms, 0, 0)])
                        +- EventTimeWatermark time#22: timestamp, interval 10 minutes
                           +- *Project [cast(split(cast(value#76 as string), ,)[0] as timestamp) AS time#22, cast(split(cast(value#76 as string), ,)[1] as int) AS key#27, split(cast(value#76 as string), ,)[2] AS value#33]
                              +- Scan ExistingRDD[key#75,value#76,topic#77,partition#78,offset#79L,timestamp#80,timestampType#81]