FlatMapGroupsWithStateStrategy Execution Planning Strategy¶
FlatMapGroupsWithStateStrategy
is an execution planning strategy (Spark SQL) that plans streaming queries with FlatMapGroupsWithState logical operators to FlatMapGroupsWithStateExec physical operator (with undefined StatefulOperatorStateInfo
, batchTimestampMs
, and eventTimeWatermark
).
FlatMapGroupsWithStateStrategy
is part of extraPlanningStrategies
of the SparkPlanner (of IncrementalExecution).
spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion¶
FlatMapGroupsWithStateStrategy
uses spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion for the state format version of FlatMapGroupsWithStateExec.
Demo¶
import org.apache.spark.sql.streaming.GroupState
val stateFunc = (key: Long, values: Iterator[(Timestamp, Long)], state: GroupState[Long]) => {
Iterator((key, values.size))
}
import java.sql.Timestamp
import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode}
val numGroups = spark.
readStream.
format("rate").
load.
as[(Timestamp, Long)].
groupByKey { case (time, value) => value % 2 }.
flatMapGroupsWithState(OutputMode.Update, GroupStateTimeout.NoTimeout)(stateFunc)
scala> numGroups.explain(true)
== Parsed Logical Plan ==
'SerializeFromObject [assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1 AS _1#267L, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2 AS _2#268]
+- 'FlatMapGroupsWithState <function3>, unresolveddeserializer(upcast(getcolumnbyordinal(0, LongType), LongType, - root class: "scala.Long"), value#262L), unresolveddeserializer(newInstance(class scala.Tuple2), timestamp#253, value#254L), [value#262L], [timestamp#253, value#254L], obj#266: scala.Tuple2, class[value[0]: bigint], Update, false, NoTimeout
+- AppendColumns <function1>, class scala.Tuple2, [StructField(_1,TimestampType,true), StructField(_2,LongType,false)], newInstance(class scala.Tuple2), [input[0, bigint, false] AS value#262L]
+- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@38bcac50,rate,List(),None,List(),None,Map(),None), rate, [timestamp#253, value#254L]
...
== Physical Plan ==
*SerializeFromObject [assertnotnull(input[0, scala.Tuple2, true])._1 AS _1#267L, assertnotnull(input[0, scala.Tuple2, true])._2 AS _2#268]
+- FlatMapGroupsWithState <function3>, value#262: bigint, newInstance(class scala.Tuple2), [value#262L], [timestamp#253, value#254L], obj#266: scala.Tuple2, StatefulOperatorStateInfo(<unknown>,84b5dccb-3fa6-4343-a99c-6fa5490c9b33,0,0), class[value[0]: bigint], Update, NoTimeout, 0, 0
+- *Sort [value#262L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(value#262L, 200)
+- AppendColumns <function1>, newInstance(class scala.Tuple2), [input[0, bigint, false] AS value#262L]
+- StreamingRelation rate, [timestamp#253, value#254L]