Skip to content

StreamExecution

StreamExecution is an abstraction of stream execution engines (streaming query processing engines) that run a structured query (on a stream execution thread).

Creating Instance of StreamExecution

Note

Continuous query, streaming query, continuous Dataset, streaming Dataset are all considered high-level synonyms for an executable entity that stream execution engines run using the analyzed logical plan internally.

Important

StreamExecution does not support adaptive query execution and cost-based optimizer (and turns them off when requested to run stream processing).

StreamExecution is the execution environment of a streaming query that is executed every trigger and in the end adds the results to a sink.

StreamExecution corresponds to a single streaming query with one or more streaming sources and exactly one streaming sink.

import org.apache.spark.sql.streaming.Trigger
import scala.concurrent.duration._
val q = spark.
  readStream.
  format("rate").
  load.
  writeStream.
  format("console").
  trigger(Trigger.ProcessingTime(10.minutes)).
  start
scala> :type q
org.apache.spark.sql.streaming.StreamingQuery

// Pull out StreamExecution off StreamingQueryWrapper
import org.apache.spark.sql.execution.streaming.{StreamExecution, StreamingQueryWrapper}
val se = q.asInstanceOf[StreamingQueryWrapper].streamingQuery
scala> :type se
org.apache.spark.sql.execution.streaming.StreamExecution

Contract

Logical Plan

logicalPlan: LogicalPlan

logicalPlan is part of the ProgressReporter abstraction.


Analyzed logical plan of the streaming query to execute

Note

No idea why logicalPlan is part of StreamExecution since there's (almost) no change from the parent ProgressReporter except the access qualifier (from protected to public).

Could this be the only reason?

Used when:

Running Activated Streaming Query

runActivatedStream(
  sparkSessionForStream: SparkSession): Unit

Executes (runs) the activated streaming query (that is described by the logical plan)

Used when:

Implementations

Creating Instance

StreamExecution takes the following to be created:

  • SparkSession (Spark SQL)
  • Name of the streaming query (can be null)
  • Path of the checkpoint directory (metadata directory)
  • Streaming query (not used due to logicalPlan)
  • Sink
  • Trigger
  • Clock
  • OutputMode
  • deleteCheckpointOnStop flag (whether to delete the checkpoint directory on stop)
Abstract Class

StreamExecution is an abstract class and cannot be created directly. It is created indirectly for the concrete StreamExecutions.

Sink

sink: Table

StreamExecution is given a Table (Spark SQL) when created.

The Table represents the sink this streaming query writes to.

sink is part of the ProgressReporter abstraction.

Starting Streaming Query

start(): Unit

start starts a stream execution thread to run stream processing.

StreamExecution's Starting Streaming Query (on Execution Thread)


start prints out the following INFO message to the logs (with the prettyId and the checkpointRoot):

Starting [prettyId]. Use [checkpointRoot] to store the query checkpoint.

start then starts the stream execution thread as a daemon thread (in its own execution thread on JVM).

Note

start uses Java's Thread.start to run the streaming query on a separate execution thread.

In the end, start pauses the main thread (using the latch) until StreamExecution is requested to run the streaming query (that sends a QueryStartedEvent to all streaming listeners followed by decrementing the count of the startLatch).


start is used when:

Configuration Properties

s.s.s.minBatchesToRetain

StreamExecution uses the spark.sql.streaming.minBatchesToRetain configuration property to allow the StreamExecutions to discard old log entries (from the offset and commit logs).

s.s.s.pollingDelay

StreamExecution uses spark.sql.streaming.pollingDelay configuration property to control how long to delay polling for new data (when no data was available to process in a batch).

ProgressReporter

StreamExecution is a ProgressReporter and reports status of the streaming query (when it starts, progresses and terminates) by posting StreamingQueryListener events.

import org.apache.spark.sql.streaming.Trigger
import scala.concurrent.duration._
val sq = spark
  .readStream
  .text("server-logs")
  .writeStream
  .format("console")
  .queryName("debug")
  .trigger(Trigger.ProcessingTime(20.seconds))
  .start

// Enable the log level to see the INFO and DEBUG messages
// log4j.logger.org.apache.spark.sql.execution.streaming.StreamExecution=DEBUG

17/06/18 21:21:07 INFO StreamExecution: Starting new streaming query.
17/06/18 21:21:07 DEBUG StreamExecution: getOffset took 5 ms
17/06/18 21:21:07 DEBUG StreamExecution: Stream running from {} to {}
17/06/18 21:21:07 DEBUG StreamExecution: triggerExecution took 9 ms
17/06/18 21:21:07 DEBUG StreamExecution: Execution stats: ExecutionStats(Map(),List(),Map())
17/06/18 21:21:07 INFO StreamExecution: Streaming query made progress: {
  "id" : "8b57b0bd-fc4a-42eb-81a3-777d7ba5e370",
  "runId" : "920b227e-6d02-4a03-a271-c62120258cea",
  "name" : "debug",
  "timestamp" : "2017-06-18T19:21:07.693Z",
  "numInputRows" : 0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "getOffset" : 5,
    "triggerExecution" : 9
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "FileStreamSource[file:/Users/jacek/dev/oss/spark/server-logs]",
    "startOffset" : null,
    "endOffset" : null,
    "numInputRows" : 0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@2460208a"
  }
}
17/06/18 21:21:10 DEBUG StreamExecution: Starting Trigger Calculation
17/06/18 21:21:10 DEBUG StreamExecution: getOffset took 3 ms
17/06/18 21:21:10 DEBUG StreamExecution: triggerExecution took 3 ms
17/06/18 21:21:10 DEBUG StreamExecution: Execution stats: ExecutionStats(Map(),List(),Map())

Unique Data Streams

uniqueSources: Map[SparkDataStream, ReadLimit]

StreamExecution defines uniqueSources internal registry for the unique SparkDataStreams of a streaming query with their ReadLimits.

StreamExecution's uniqueSources Registry of Streaming Data Sources

uniqueSources registry is initialized when the stream execution engines are requested for the analyzed logical plan:

uniqueSources is used when:

Used when StreamExecution is requested to stop all streaming data streams

Streaming Query Identifiers

The name, id and runId are all unique across all active queries (in a StreamingQueryManager). The difference is that:

  • name is optional and user-defined

  • id is a UUID that is auto-generated at the time StreamExecution is created and persisted to metadata checkpoint file

  • runId is a UUID that is auto-generated every time StreamExecution is created

Id

StreamExecution is uniquely identified by an ID of the streaming query (which is the id of the StreamMetadata).

Since the StreamMetadata is persisted (to the metadata file in the checkpoint directory), the streaming query ID "survives" query restarts as long as the checkpoint directory is preserved.

Run Id

StreamExecution is uniquely identified by a run ID of the streaming query. A run ID is a randomly-generated 128-bit universally unique identifier (UUID) that is assigned at the time StreamExecution is created.

runId does not "survive" query restarts and will always be different yet unique (across all active queries).

StreamMetadata

streamMetadata: StreamMetadata

StreamExecution creates a StreamMetadata when created.

StreamExecution uses metadata as the filename of the metadata file that is stored (in JSON format) in the checkpoint directory.

The metadata file is used to restore (recover) the id (e.g., when a streaming query is resumed after a failure or a planned stop).

Metadata Logs

Write-Ahead Offset Log

offsetLog: OffsetSeqLog

StreamExecution creates an OffsetSeqLog when created.

offsetLog stores offsets in offsets subdirectory of the metadata directory.

offsetLog is used as a Write-Ahead Log of Offsets to persist offsets of the data about to be processed.

The number of entries in the OffsetSeqLog is controlled using spark.sql.streaming.minBatchesToRetain configuration property.

offsetLog is used when:

Offset Commit Log

StreamExecution uses offset commit log (CommitLog with commits metadata checkpoint directory) for streaming batches successfully executed (with a single file per batch with a file name being the batch id) or committed epochs.

Note

Metadata log or metadata checkpoint are synonyms and are often used interchangeably.

commitLog is used by the <> for the following:

  • MicroBatchExecution is requested to <> (that in turn requests to <> at the very beginning of the streaming query execution and later regularly every <>)

  • ContinuousExecution is requested to <> (that in turn requests to <> at the very beginning of the streaming query execution and later regularly every <>)

State of Streaming Query

state: AtomicReference[State]

state indicates the internal state of execution of the streaming query (as java.util.concurrent.atomic.AtomicReference).

ACTIVE

StreamExecution has been requested to <> (and is about to <>)

INITIALIZING

StreamExecution has been created.

TERMINATED

Indicates that:

RECONFIGURING

Used when ContinuousExecution is requested to run a streaming query in continuous mode (and the ContinuousReader indicated a need for reconfiguration)

Creating StreamingWrite

createStreamingWrite(
  table: SupportsWrite,
  options: Map[String, String],
  inputPlan: LogicalPlan): StreamingWrite

createStreamingWrite creates a LogicalWriteInfoImpl (with the query ID, the schema of the input LogicalPlan and the given options).

createStreamingWrite requests the given SupportsWrite (Spark SQL) table for a WriteBuilder (for the LogicalWriteInfoImpl).

createStreamingWrite branches based on the OutputMode:

  • For Append output mode, createStreamingWrite requests the WriteBuilder to build a StreamingWrite.

  • For Complete output mode, createStreamingWrite assumes that the WriteBuilder is a SupportsTruncate (Spark SQL) and requests it to truncate followed by buildForStreaming

  • For Update output mode, createStreamingWrite assumes that the WriteBuilder is a SupportsStreamingUpdate and requests it to update followed by buildForStreaming

createStreamingWrite is used when:

Offsets by SparkDataStream

Available

availableOffsets: StreamProgress

availableOffsets is a registry of offsets per streaming source to track what data (by offset) is available for processing for every streaming source in the streaming query (and have not yet been committed).

availableOffsets works in tandem with the committedOffsets internal registry.

availableOffsets is empty when StreamExecution is created (i.e. no offsets are reported for any streaming source in the streaming query).

Committed (Processed)

committedOffsets: StreamProgress

committedOffsets is a registry of offsets by streaming source to track what data (by offset) has already been processed and committed (to the sink or state stores) for every streaming source in the streaming query.

committedOffsets works in tandem with the availableOffsets internal registry.

Latest

latestOffsets: StreamProgress

StreamExecution creates a StreamProgress to track the latest available offsets (by SparkDataStream) when created.

latestOffsets is updated when:

latestOffsets is used when:

Fully-Qualified (Resolved) Path to Checkpoint Root Directory

resolvedCheckpointRoot: String

resolvedCheckpointRoot is a fully-qualified path of the given checkpoint root directory.

The given checkpoint root directory is defined using checkpointLocation option or the spark.sql.streaming.checkpointLocation configuration property with queryName option.

checkpointLocation and queryName options are defined when StreamingQueryManager is requested to create a streaming query.

resolvedCheckpointRoot is used when creating the path to the checkpoint directory and when StreamExecution finishes running streaming batches.

resolvedCheckpointRoot is used for the logicalPlan (while transforming analyzedPlan and planning StreamingRelation logical operators to corresponding StreamingExecutionRelation physical operators with the streaming data sources created passing in the path to sources directory to store checkpointing metadata).

resolvedCheckpointRoot is printed out immediately when resolved as a INFO message to the logs:

Checkpoint root [checkpointRoot] resolved to [resolvedCheckpointRoot].

resolvedCheckpointRoot is printed out again as a INFO message to the logs when StreamExecution is started:

Starting [prettyIdString]. Use [resolvedCheckpointRoot] to store the query checkpoint.

StreamWriterCommitProgress

sinkCommitProgress: Option[StreamWriterCommitProgress]

sinkCommitProgress is part of the ProgressReporter abstraction.

StreamExecution initializes sinkCommitProgress registry to be None when created.

Last Incremental QueryExecution

lastExecution: IncrementalExecution

lastExecution is part of the ProgressReporter abstraction.


lastExecution is an IncrementalExecution (a QueryExecution of a streaming query) of the most recent (last) execution.

lastExecution is created (and requested for the "executable" SparkPlan) when the stream execution engines are requested for the following:

lastExecution is the QueryExecution of the DataFrame with new data for every microbatch in MicroBatchExecution.

lastExecution is used when:

Explaining Streaming Query

explain(): Unit // (1)!
explain(extended: Boolean): Unit
  1. Turns the extended flag off (false)

explain simply prints out explainInternal to the standard output.

Stopping Streaming Sources and Readers

stopSources(): Unit

stopSources requests every streaming source to stop.

In case of an non-fatal exception, stopSources prints out the following WARN message to the logs:

Failed to stop streaming source: [source]. Resources may have leaked.

stopSources is used when:

  • StreamExecution is requested to <> (and <> successfully or not)

  • ContinuousExecution is requested to <> (and terminates)

Running Stream Processing

runStream(): Unit

runStream simply prepares the environment to execute the activated streaming query.

runStream is used when the stream execution thread is requested to start (when DataStreamWriter is requested to start an execution of the streaming query).

Internally, runStream sets the job group (to all the Spark jobs started by this thread) as follows:

Note

runStream uses the SparkSession to access SparkContext and assign the job group id.

Learn more about SparkContext.setJobGroup method in The Internals of Apache Spark online book.

runStream sets sql.streaming.queryId local property to id.

runStream requests the MetricsSystem to register the MetricsReporter when spark.sql.streaming.metricsEnabled configuration property is enabled.

runStream notifies StreamingQueryListeners that the streaming query has been started (by posting a new QueryStartedEvent event with id, runId, and name).

StreamingQueryListener Notified about Query's Start (onQueryStarted)

runStream unblocks the main starting thread (by decrementing the count of the startLatch that when 0 lets the starting thread continue).

runStream updates the status message to be Initializing sources.

runStream initializes the analyzed logical plan.

Lazy Value

The analyzed logical plan is a Scala lazy value to guarantee that the code to initialize it is executed once only (when accessed for the first time) and cached afterwards.

runStream disables adaptive query execution and cost-based join optimization (by turning spark.sql.adaptive.enabled and spark.sql.cbo.enabled configuration properties off, respectively).

runStream creates a new "zero" OffsetSeqMetadata.

(when in INITIALIZING state) runStream enters ACTIVE state:

Note

runBatches does the main work only when first started (when in INITIALIZING state).

runStream...FIXME (describe the failed and stop states)

Once TriggerExecutor has finished executing batches, runBatches updates the status message to Stopped.

NOTE: TriggerExecutor finishes executing batches when the batch runner returns whether the streaming query is stopped or not (while active).

finally Block

runStream releases the startLatch and initializationLatch latches.

runStream stopSources.

runStream enters TERMINATED state.

runStream sets the StreamingQueryStatus with the isTriggerActive and isDataAvailable flags off (false).

runStream removes the stream metrics reporter from the application's MetricsSystem.

runStream requests the StreamingQueryManager to handle termination of a streaming query.

runStream creates a new QueryTerminatedEvent (with the id and run id of the streaming query) and posts it.

With the deleteCheckpointOnStop flag enabled and no StreamingQueryException, runStream deletes the checkpoint directory.

In the end, runStream releases the terminationLatch latch.

TriggerExecutor's Batch Runner

Batch Runner (batchRunner) is an executable block executed by TriggerExecutor in runBatches.

batchRunner starts trigger calculation.

As long as the query is not stopped (i.e. state is not TERMINATED), batchRunner executes the streaming batch for the trigger.

In triggerExecution time-tracking section, runBatches branches off per currentBatchId:

If there is data available in the sources, batchRunner marks currentStatus with isDataAvailable enabled.

Tip

You can check out the status of a streaming query using status method.

scala> spark.streams.active(0).status
res1: org.apache.spark.sql.streaming.StreamingQueryStatus =
{
  "message" : "Waiting for next trigger",
  "isDataAvailable" : false,
  "isTriggerActive" : false
}

batchRunner then updates the status message to Processing new data and runs the current streaming batch.

StreamExecution's Running Batches (on Execution Thread)

After triggerExecution section has finished, batchRunner finishes the streaming batch for the trigger (and collects query execution statistics).

When there was <> in the sources, batchRunner updates committed offsets (by adding the <> to BatchCommitLog and adding availableOffsets to committedOffsets).

batchRunner prints out the following DEBUG message to the logs:

batch [currentBatchId] committed

batchRunner increments the current batch id and sets the job description for all the following Spark jobs to include the new batch id.

When no data was available in the sources to process, batchRunner does the following:

  1. Marks currentStatus with isDataAvailable disabled

  2. Updates the status message to Waiting for data to arrive

  3. Sleeps the current thread for pollingDelayMs milliseconds.

batchRunner updates the status message to Waiting for next trigger and returns whether the query is currently active or not (so TriggerExecutor can decide whether to finish executing the batches or not)

Path to Checkpoint Directory

checkpointFile(
  name: String): String

checkpointFile gives the path of a directory with name in checkpoint directory.

checkpointFile is used for streamMetadata, OffsetSeqLog, BatchCommitLog, and lastExecution (for runBatch).

Posting StreamingQueryListener Event

postEvent(
  event: StreamingQueryListener.Event): Unit

postEvent is a part of the ProgressReporter abstraction.

postEvent simply requests the StreamingQueryManager to post the input event (to the StreamingQueryListenerBus in the current SparkSession).

Note

postEvent uses SparkSession to access the current StreamingQueryManager.

postEvent is used when:

Waiting Until No New Data Available in Sources or Query Has Been Terminated

processAllAvailable(): Unit

processAllAvailable is a part of the StreamingQuery abstraction.

processAllAvailable reports the <> if reported (and returns immediately).

NOTE: <> is reported exclusively when StreamExecution is requested to <> (that terminated with an exception).

processAllAvailable returns immediately when StreamExecution is no longer <> (in TERMINATED state).

processAllAvailable acquires a lock on the <> and turns the <> internal flag off (false).

processAllAvailable keeps polling with 10-second pauses (locked on <>) until <> flag is turned on (true) or StreamExecution is no longer <> (in TERMINATED state).

NOTE: The 10-second pause is hardcoded and cannot be changed.

In the end, processAllAvailable releases <> lock.

processAllAvailable throws an IllegalStateException when executed on the <>:

Cannot wait for a query state from the same thread that is running the query

Stream Execution Thread

queryExecutionThread: QueryExecutionThread

queryExecutionThread is a Java thread of execution (java.util.Thread) that runs a streaming query.

queryExecutionThread is started (as a daemon thread) when StreamExecution is requested to <>. At that time, start prints out the following INFO message to the logs (with the <> and the <>):

Starting [prettyIdString]. Use [resolvedCheckpointRoot] to store the query checkpoint.

When started, queryExecutionThread sets the <> and <>.

queryExecutionThread uses the name stream execution thread for [id] (that uses <> for the id, i.e. queryName [id = [id], runId = [runId]]).

queryExecutionThread is a QueryExecutionThread that is a custom UninterruptibleThread from Apache Spark with runUninterruptibly method for running a block of code without being interrupted by Thread.interrupt().

Tip

Use Java's jconsole or jstack to monitor stream execution threads.

$ jstack <driver-pid> | grep -e "stream execution thread"
"stream execution thread for kafka-topic1 [id =...

Batch Metadata

offsetSeqMetadata: OffsetSeqMetadata

offsetSeqMetadata is part of the ProgressReporter abstraction.


StreamExecution creates an OffsetSeqMetadata when created.

Most importantly, the OffsetSeqMetadata is used to create an IncrementalExecution in the queryPlanning phase of the MicroBatchExecution and ContinuousExecution execution engines.

offsetSeqMetadata is initialized (with 0 for batchWatermarkMs and batchTimestampMs) when StreamExecution is requested to <>.

offsetSeqMetadata is then updated (with the current event-time watermark and timestamp) when MicroBatchExecution is requested to <>.

NOTE: MicroBatchExecution uses the <> for the current event-time watermark and the <> for the current batch timestamp.

offsetSeqMetadata is stored (checkpointed) in <> of MicroBatchExecution (and printed out as INFO message to the logs).

offsetSeqMetadata is restored (re-created) from a checkpointed state when MicroBatchExecution is requested to <>.

offsetSeqMetadata is part of the ProgressReporter abstraction.

isActive

isActive: Boolean

isActive is part of the StreamingQuery abstraction.

isActive is enabled (true) as long as the State is not TERMINATED.

Human-Readable HTML Description of Spark Jobs (for web UI)

getBatchDescriptionString: String

getBatchDescriptionString is a human-readable description (in HTML format) that uses the optional name if defined, the <>, the <> and batchDescription that can be init (for the <> negative) or the current batch ID itself.

getBatchDescriptionString is of the following format:

[name]
id = [id]
runId = [runId]
batch = [batchDescription]

Monitoring Streaming Query using web UI (Spark Jobs)

getBatchDescriptionString is used when:

  • MicroBatchExecution stream execution engine is requested to <> (as the job description of any Spark jobs triggerred as part of query execution)

  • StreamExecution is requested to <> (as the job group description of any Spark jobs triggerred as part of query execution)

No New Data Available

noNewData: Boolean

noNewData is a flag that indicates that a batch has completed with no new data left and processAllAvailable could stop waiting till all streaming data is processed.

Default: false

Turned on (true) when:

  • MicroBatchExecution stream execution engine is requested to <> (while <>)

  • ContinuousExecution stream execution engine is requested to <>

Turned off (false) when:

  • MicroBatchExecution stream execution engine is requested to <> (right after the <> phase)

  • StreamExecution is requested to <>

Current Batch ID

newData Registry

newData: Map[BaseStreamingSource, LogicalPlan]

Registry of the streaming sources (in the logical query plan) that have new data available in the current batch. The new data is a streaming DataFrame.

newData is part of the ProgressReporter abstraction.

Set when StreamExecution is requested to requests unprocessed data from streaming sources (while running a single streaming batch)

MetricsReporter

StreamExecution defines a MetricsReporter to report streaming metrics.

MetricsReporter is created with the following source name (with name if defined or id):

spark.streaming.[name or id]

MetricsReporter is registered only when spark.sql.streaming.metricsEnabled configuration property is enabled (when StreamExecution is requested to run).

MetricsReporter is deactivated (removed) when a streaming query is stopped (when StreamExecution is requested to runStream).

Latches

StreamExecution uses java.util.concurrent.CountDownLatches (with count 1).

initializationLatch

Counted down when requested to runStream:

Awaited for tests only (which seems to indicate that it is a test-only latch)

startLatch

Counted down when requested to runStream:

Awaited when requested to start (to pause the main thread until StreamExecution was requested to run the streaming query on a separate thread)

terminationLatch

Counted down at the end of runStream

Awaited when requested to awaitTermination (that pauses the thread until the streaming query has finished successfully or not).

Locks

awaitProgressLock

StreamExecution uses a fair reentrant mutual exclusion java.util.concurrent.locks.ReentrantLock (that favors granting access to the longest-waiting thread under contention)

__is_continuous_processing Local Property

StreamExecution uses __is_continuous_processing local property (default: false) to differentiate between <> (true) and <> (false) which is used when StateStoreRDD is requested to compute a partition (and finds a StateStore for a given version).

Demo

import org.apache.spark.sql.streaming.StreamingQuery
assert(sq.isInstanceOf[StreamingQuery])

import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper
val se = sq.asInstanceOf[StreamingQueryWrapper].streamingQuery

scala> :type se
org.apache.spark.sql.execution.streaming.StreamExecution

Logging

Enable ALL logging level for org.apache.spark.sql.execution.streaming.StreamExecution logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.execution.streaming.StreamExecution=ALL

Refer to Logging.