Skip to content

IncrementalExecution

IncrementalExecution is a QueryExecution (Spark SQL) made for streaming queries with the following customizations:

IncrementalExecution is part of StreamExecutions (e.g. MicroBatchExecution and ContinuousExecution).

Creating Instance

IncrementalExecution takes the following to be created:

IncrementalExecution is created when:

OutputMode

IncrementalExecution is given an OutputMode when created.

The OutputMode is used for the following:

State Checkpoint Location

IncrementalExecution is given a directory (location) for state checkpointing when created.

The directory is as follows:

val queryName = "rate2memory"
val checkpointLocation = s"file:/tmp/checkpoint-$queryName"
val query = spark
  .readStream
  .format("rate")
  .load
  .groupBy($"value" % 2 as "gid")
  .count
scala> query.explain
== Physical Plan ==
*(4) HashAggregate(keys=[_groupingexpression#22L], functions=[count(1)])
+- StateStoreSave [_groupingexpression#22L], state info [ checkpoint = <unknown>, runId = 0ec02b7b-ada5-4a71-970e-145453417319, opId = 0, ver = 0, numPartitions = 200], Append, 0, 2
   +- *(3) HashAggregate(keys=[_groupingexpression#22L], functions=[merge_count(1)])
      +- StateStoreRestore [_groupingexpression#22L], state info [ checkpoint = <unknown>, runId = 0ec02b7b-ada5-4a71-970e-145453417319, opId = 0, ver = 0, numPartitions = 200], 2
         +- *(2) HashAggregate(keys=[_groupingexpression#22L], functions=[merge_count(1)])
            +- Exchange hashpartitioning(_groupingexpression#22L, 200), ENSURE_REQUIREMENTS, [plan_id=138]
               +- *(1) HashAggregate(keys=[_groupingexpression#22L], functions=[partial_count(1)])
                  +- *(1) Project [(value#9L % 2) AS _groupingexpression#22L]
                     +- StreamingRelation rate, [timestamp#8, value#9L]
query.writeStream
  .format("memory")
  .queryName(queryName)
  .option("checkpointLocation", checkpointLocation)
  .start

// Give the streaming query a moment (one micro-batch)
// So lastExecution is available for the checkpointLocation
import scala.concurrent.duration._
query.awaitTermination(1.second.toMillis)

import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper
val stateCheckpointDir = query
  .asInstanceOf[StreamingQueryWrapper]
  .streamingQuery
  .lastExecution
  .checkpointLocation
val stateDir = s"$checkpointLocation/state"
assert(stateCheckpointDir equals stateDir)

The State Checkpoint Location is used when:

StreamingExplainCommand

IncrementalExecution is used to create a StreamingExplainCommand.

statefulOperatorId

IncrementalExecution uses the statefulOperatorId internal counter for the IDs of the stateful operators in the optimized logical plan (while applying the preparations rules) when requested to prepare the plan for execution (in executedPlan phase).

Preparing Logical Plan (of Streaming Query) for Execution

When requested for an optimized logical plan (of the analyzed logical plan), IncrementalExecution transforms CurrentBatchTimestamp and ExpressionWithRandomSeed expressions with the timestamp literal and new random seeds, respectively. When transforming CurrentBatchTimestamp expressions, IncrementalExecution prints out the following INFO message to the logs:

Current batch timestamp = [timestamp]

Right after being created, IncrementalExecution is executed (in the queryPlanning phase by the MicroBatchExecution and ContinuousExecution stream execution engines) and so the entire query execution pipeline is executed up to and including executedPlan. That means that the extra planning strategies and the state preparation rule have been applied at this point and the streaming query is ready for execution.

Physical Optimization (Preparations Rules)

preparations: Seq[Rule[SparkPlan]]

preparations is part of the QueryExecution (Spark SQL) abstraction.


preparations is the state optimization rules before the parent's ones.

Number of State Stores (spark.sql.shuffle.partitions)

numStateStores: Int

numStateStores is the number of state stores which corresponds to spark.sql.shuffle.partitions configuration property (default: 200).

Tip

Learn more about spark.sql.shuffle.partitions configuration property in The Internals of Spark SQL online book.

Internally, numStateStores requests the OffsetSeqMetadata for the spark.sql.shuffle.partitions configuration property (using the streaming configuration) or simply takes whatever was defined for the given SparkSession (default: 200).

numStateStores is initialized right when IncrementalExecution is created.

numStateStores is used when IncrementalExecution is requested for the state info of the next stateful operator (when requested to optimize a streaming physical plan using the state preparation rule that creates the stateful physical operators: StateStoreSaveExec, StateStoreRestoreExec, StreamingDeduplicateExec, FlatMapGroupsWithStateExec, StreamingSymmetricHashJoinExec, and StreamingGlobalLimitExec).

Extra Planning Strategies

IncrementalExecution uses a custom SparkPlanner (Spark SQL) with the following extra planning strategies to plan the streaming query for execution:

  1. StreamingJoinStrategy
  2. StatefulAggregationStrategy
  3. FlatMapGroupsWithStateStrategy
  4. StreamingRelationStrategy
  5. StreamingDeduplicationStrategy
  6. StreamingGlobalLimitStrategy

State Preparation Rule For Execution-Specific Configuration

state: Rule[SparkPlan]

state is a custom physical preparation rule (Rule[SparkPlan]) that can transform a streaming physical plan (SparkPlan) with the following physical operators:

state simply transforms the physical plan with the above physical operators and fills out the execution-specific configuration:

state rule is used (as part of the physical query optimizations) when IncrementalExecution is requested to optimize (prepare) the physical plan of the streaming query (once for ContinuousExecution and every trigger for MicroBatchExecution in queryPlanning phase).

Tip

Learn more about Physical Query Optimizations in The Internals of Spark SQL online book.

Next StatefulOperationStateInfo

nextStatefulOperationStateInfo(): StatefulOperatorStateInfo

nextStatefulOperationStateInfo simply creates a new StatefulOperatorStateInfo with the state checkpoint location, the run ID (of the streaming query), the next statefulOperator ID, the current batch ID, and the number of state stores.

Note

The only changing part of StatefulOperatorStateInfo across calls of the nextStatefulOperationStateInfo method is the the next statefulOperator ID.

All the other properties (the state checkpoint location, the run ID, the current batch ID, and the number of state stores) are the same within a single IncrementalExecution instance.

The only two properties that may ever change are the run ID (after a streaming query is restarted from the checkpoint) and the current batch ID (every micro-batch in MicroBatchExecution execution engine).

nextStatefulOperationStateInfo is used when IncrementalExecution is requested to optimize a streaming physical plan using the state preparation rule (and creates the stateful physical operators: StateStoreSaveExec, StateStoreRestoreExec, StreamingDeduplicateExec, FlatMapGroupsWithStateExec, StreamingSymmetricHashJoinExec, and StreamingGlobalLimitExec).

Checking Out Whether Last Execution Requires Another Non-Data Micro-Batch

shouldRunAnotherBatch(
   newMetadata: OffsetSeqMetadata): Boolean

shouldRunAnotherBatch is positive (true) if there is at least one StateStoreWriter operator (in the executedPlan physical query plan) that requires another non-data batch (per the given OffsetSeqMetadata with the event-time watermark and the batch timestamp).

Otherwise, shouldRunAnotherBatch is negative (false).

shouldRunAnotherBatch is used when MicroBatchExecution is requested to construct the next streaming micro-batch (and checks out whether the last batch execution requires another non-data batch).

Demo: State Checkpoint Directory

Using setConf(SHUFFLE_PARTITIONS, 1) will make for an easier debugging as the state is then only for one partition and makes monitoring easier.

import org.apache.spark.sql.internal.SQLConf.SHUFFLE_PARTITIONS
spark.sessionState.conf.setConf(SHUFFLE_PARTITIONS, 1)

assert(spark.sessionState.conf.numShufflePartitions == 1)

Using the rate source as an input.

val counts = spark
  .readStream
  .format("rate")
  .load
  .groupBy(window($"timestamp", "5 seconds") as "group")
  .agg(count("value") as "value_count") // <-- creates an Aggregate logical operator
  .orderBy("group")  // <-- makes for easier checking

assert(counts.isStreaming, "This should be a streaming query")

Searching for checkpoint = <unknown> in the following output for StateStoreSaveExec and StateStoreRestoreExec physical operators.

scala> counts.explain
== Physical Plan ==
*(5) Sort [group#5 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(group#5 ASC NULLS FIRST, 1)
   +- *(4) HashAggregate(keys=[window#11], functions=[count(value#1L)])
      +- StateStoreSave [window#11], state info [ checkpoint = <unknown>, runId = 558bf725-accb-487d-97eb-f790fa4a6138, opId = 0, ver = 0, numPartitions = 1], Append, 0, 2
         +- *(3) HashAggregate(keys=[window#11], functions=[merge_count(value#1L)])
            +- StateStoreRestore [window#11], state info [ checkpoint = <unknown>, runId = 558bf725-accb-487d-97eb-f790fa4a6138, opId = 0, ver = 0, numPartitions = 1], 2
               +- *(2) HashAggregate(keys=[window#11], functions=[merge_count(value#1L)])
                  +- Exchange hashpartitioning(window#11, 1)
                     +- *(1) HashAggregate(keys=[window#11], functions=[partial_count(value#1L)])
                        +- *(1) Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) as double) = (cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) THEN (CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) END + 0) - 1) * 5000000) + 0), LongType, TimestampType), end, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) as double) = (cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) THEN (CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) END + 0) - 1) * 5000000) + 5000000), LongType, TimestampType)) AS window#11, value#1L]
                           +- *(1) Filter isnotnull(timestamp#0)
                              +- StreamingRelation rate, [timestamp#0, value#1L]

Start the query with the checkpointLocation option.

val checkpointLocation = "/tmp/spark-streams-state-checkpoint-root"

import scala.concurrent.duration._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
val t = Trigger.ProcessingTime(1.hour) // should be enough time for exploration
val sq = counts
  .writeStream
  .format("console")
  .option("truncate", false)
  .option("checkpointLocation", checkpointLocation)
  .trigger(t)
  .outputMode(OutputMode.Complete)
  .start

Wait till the first batch which should happen right after start and access lastExecution that has the checkpoint resolved.

import org.apache.spark.sql.execution.streaming._
val lastExecution = sq.asInstanceOf[StreamingQueryWrapper].streamingQuery.lastExecution
assert(lastExecution.checkpointLocation == s"file:${checkpointLocation}/state")