StreamExecution¶
StreamExecution is an abstraction of stream execution engines (streaming query processing engines) that run a structured query (on a stream execution thread).

Note
Continuous query, streaming query, continuous Dataset, streaming Dataset are all considered high-level synonyms for an executable entity that stream execution engines run using the analyzed logical plan internally.
Important
StreamExecution does not support adaptive query execution and cost-based optimizer (and turns them off when requested to run stream processing).
StreamExecution is the execution environment of a streaming query that is executed every trigger and in the end adds the results to a sink.
StreamExecution corresponds to a single streaming query with one or more streaming sources and exactly one streaming sink.
import org.apache.spark.sql.streaming.Trigger
import scala.concurrent.duration._
val q = spark.
readStream.
format("rate").
load.
writeStream.
format("console").
trigger(Trigger.ProcessingTime(10.minutes)).
start
scala> :type q
org.apache.spark.sql.streaming.StreamingQuery
// Pull out StreamExecution off StreamingQueryWrapper
import org.apache.spark.sql.execution.streaming.{StreamExecution, StreamingQueryWrapper}
val se = q.asInstanceOf[StreamingQueryWrapper].streamingQuery
scala> :type se
org.apache.spark.sql.execution.streaming.StreamExecution
Contract¶
Logical Plan¶
logicalPlan: LogicalPlan
logicalPlan is part of the ProgressReporter abstraction.
Analyzed logical plan of the streaming query to execute
Note
No idea why logicalPlan is part of StreamExecution since there's (almost) no change from the parent ProgressReporter except the access qualifier (from protected to public).
Could this be the only reason?
Used when:
StreamExecutionis requested to run stream processing
Running Activated Streaming Query¶
runActivatedStream(
sparkSessionForStream: SparkSession): Unit
Executes (runs) the activated streaming query (that is described by the logical plan)
Used when:
StreamExecutionis requested to run the streaming query (when transitioning fromINITIALIZINGtoACTIVEstate)
Implementations¶
Creating Instance¶
StreamExecution takes the following to be created:
-
SparkSession(Spark SQL) - Name of the streaming query (can be
null) - Path of the checkpoint directory (metadata directory)
- Streaming query (not used due to logicalPlan)
- Sink
- Trigger
-
Clock - OutputMode
-
deleteCheckpointOnStopflag (whether to delete the checkpoint directory on stop)
Abstract Class
StreamExecution is an abstract class and cannot be created directly. It is created indirectly for the concrete StreamExecutions.
Sink¶
sink: Table
StreamExecution is given a Table (Spark SQL) when created.
The Table represents the sink this streaming query writes to.
sink is part of the ProgressReporter abstraction.
Starting Streaming Query¶
start(): Unit
start starts a stream execution thread to run stream processing.

start prints out the following INFO message to the logs (with the prettyId and the checkpointRoot):
Starting [prettyId]. Use [checkpointRoot] to store the query checkpoint.
start then starts the stream execution thread as a daemon thread (in its own execution thread on JVM).
Note
start uses Java's Thread.start to run the streaming query on a separate execution thread.
In the end, start pauses the main thread (using the latch) until StreamExecution is requested to run the streaming query (that sends a QueryStartedEvent to all streaming listeners followed by decrementing the count of the startLatch).
start is used when:
StreamingQueryManageris requested to start a streaming query (whenDataStreamWriteris requested to start an execution of the streaming query)
Configuration Properties¶
s.s.s.minBatchesToRetain¶
StreamExecution uses the spark.sql.streaming.minBatchesToRetain configuration property to allow the StreamExecutions to discard old log entries (from the offset and commit logs).
s.s.s.pollingDelay¶
StreamExecution uses spark.sql.streaming.pollingDelay configuration property to control how long to delay polling for new data (when no data was available to process in a batch).
ProgressReporter¶
StreamExecution is a ProgressReporter and reports status of the streaming query (when it starts, progresses and terminates) by posting StreamingQueryListener events.
import org.apache.spark.sql.streaming.Trigger
import scala.concurrent.duration._
val sq = spark
.readStream
.text("server-logs")
.writeStream
.format("console")
.queryName("debug")
.trigger(Trigger.ProcessingTime(20.seconds))
.start
// Enable the log level to see the INFO and DEBUG messages
// log4j.logger.org.apache.spark.sql.execution.streaming.StreamExecution=DEBUG
17/06/18 21:21:07 INFO StreamExecution: Starting new streaming query.
17/06/18 21:21:07 DEBUG StreamExecution: getOffset took 5 ms
17/06/18 21:21:07 DEBUG StreamExecution: Stream running from {} to {}
17/06/18 21:21:07 DEBUG StreamExecution: triggerExecution took 9 ms
17/06/18 21:21:07 DEBUG StreamExecution: Execution stats: ExecutionStats(Map(),List(),Map())
17/06/18 21:21:07 INFO StreamExecution: Streaming query made progress: {
"id" : "8b57b0bd-fc4a-42eb-81a3-777d7ba5e370",
"runId" : "920b227e-6d02-4a03-a271-c62120258cea",
"name" : "debug",
"timestamp" : "2017-06-18T19:21:07.693Z",
"numInputRows" : 0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"getOffset" : 5,
"triggerExecution" : 9
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "FileStreamSource[file:/Users/jacek/dev/oss/spark/server-logs]",
"startOffset" : null,
"endOffset" : null,
"numInputRows" : 0,
"processedRowsPerSecond" : 0.0
} ],
"sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@2460208a"
}
}
17/06/18 21:21:10 DEBUG StreamExecution: Starting Trigger Calculation
17/06/18 21:21:10 DEBUG StreamExecution: getOffset took 3 ms
17/06/18 21:21:10 DEBUG StreamExecution: triggerExecution took 3 ms
17/06/18 21:21:10 DEBUG StreamExecution: Execution stats: ExecutionStats(Map(),List(),Map())
Unique Data Streams¶
uniqueSources: Map[SparkDataStream, ReadLimit]
StreamExecution defines uniqueSources internal registry for the unique SparkDataStreams of a streaming query with their ReadLimits.

uniqueSources registry is initialized when the stream execution engines are requested for the analyzed logical plan:
uniqueSources is used when:
MicroBatchExecutionis requested to construct next micro-batch (and gets offsets of data streams)
Used when StreamExecution is requested to stop all streaming data streams
Streaming Query Identifiers¶
The name, id and runId are all unique across all active queries (in a StreamingQueryManager). The difference is that:
-
name is optional and user-defined
-
id is a UUID that is auto-generated at the time
StreamExecutionis created and persisted tometadatacheckpoint file -
runId is a UUID that is auto-generated every time
StreamExecutionis created
Id¶
StreamExecution is uniquely identified by an ID of the streaming query (which is the id of the StreamMetadata).
Since the StreamMetadata is persisted (to the metadata file in the checkpoint directory), the streaming query ID "survives" query restarts as long as the checkpoint directory is preserved.
Run Id¶
StreamExecution is uniquely identified by a run ID of the streaming query. A run ID is a randomly-generated 128-bit universally unique identifier (UUID) that is assigned at the time StreamExecution is created.
runId does not "survive" query restarts and will always be different yet unique (across all active queries).
StreamMetadata¶
streamMetadata: StreamMetadata
StreamExecution creates a StreamMetadata when created.
StreamExecution uses metadata as the filename of the metadata file that is stored (in JSON format) in the checkpoint directory.
The metadata file is used to restore (recover) the id (e.g., when a streaming query is resumed after a failure or a planned stop).
Metadata Logs¶
Write-Ahead Offset Log¶
offsetLog: OffsetSeqLog
StreamExecution creates an OffsetSeqLog when created.
offsetLog stores offsets in offsets subdirectory of the metadata directory.
offsetLog is used as a Write-Ahead Log of Offsets to persist offsets of the data about to be processed.
The number of entries in the OffsetSeqLog is controlled using spark.sql.streaming.minBatchesToRetain configuration property.
offsetLog is used when:
-
ContinuousExecutionstream execution engine is requested to commit an epoch, getStartOffsets, and addOffset -
MicroBatchExecutionstream execution engine is requested to populate start offsets and construct (or skip) the next streaming micro-batch
Offset Commit Log¶
StreamExecution uses offset commit log (CommitLog with commits metadata checkpoint directory) for streaming batches successfully executed (with a single file per batch with a file name being the batch id) or committed epochs.
Note
Metadata log or metadata checkpoint are synonyms and are often used interchangeably.
commitLog is used by the <
-
MicroBatchExecutionis requested to <> (that in turn requests to < > at the very beginning of the streaming query execution and later regularly every < >) -
ContinuousExecutionis requested to <> (that in turn requests to < > at the very beginning of the streaming query execution and later regularly every < >)
State of Streaming Query¶
state: AtomicReference[State]
state indicates the internal state of execution of the streaming query (as java.util.concurrent.atomic.AtomicReference).
ACTIVE¶
StreamExecution has been requested to <
INITIALIZING¶
StreamExecution has been created.
TERMINATED¶
Indicates that:
MicroBatchExecutionhas been requested to stopContinuousExecutionhas been requested to stopStreamExecutionhas been requested to run stream processing (and has finished running the activated streaming query)
RECONFIGURING¶
Used when ContinuousExecution is requested to run a streaming query in continuous mode (and the ContinuousReader indicated a need for reconfiguration)
Creating StreamingWrite¶
createStreamingWrite(
table: SupportsWrite,
options: Map[String, String],
inputPlan: LogicalPlan): StreamingWrite
createStreamingWrite creates a LogicalWriteInfoImpl (with the query ID, the schema of the input LogicalPlan and the given options).
createStreamingWrite requests the given SupportsWrite (Spark SQL) table for a WriteBuilder (for the LogicalWriteInfoImpl).
createStreamingWrite branches based on the OutputMode:
-
For Append output mode,
createStreamingWriterequests theWriteBuilderto build aStreamingWrite. -
For Complete output mode,
createStreamingWriteassumes that theWriteBuilderis aSupportsTruncate(Spark SQL) and requests it totruncatefollowed bybuildForStreaming -
For Update output mode,
createStreamingWriteassumes that theWriteBuilderis aSupportsStreamingUpdateand requests it toupdatefollowed bybuildForStreaming
createStreamingWrite is used when:
- MicroBatchExecution and ContinuousExecution stream execution engines are requested for analyzed logical plans
Offsets by SparkDataStream¶
Available¶
availableOffsets: StreamProgress
availableOffsets is a registry of offsets per streaming source to track what data (by offset) is available for processing for every streaming source in the streaming query (and have not yet been committed).
availableOffsets works in tandem with the committedOffsets internal registry.
availableOffsets is empty when StreamExecution is created (i.e. no offsets are reported for any streaming source in the streaming query).
Committed (Processed)¶
committedOffsets: StreamProgress
committedOffsets is a registry of offsets by streaming source to track what data (by offset) has already been processed and committed (to the sink or state stores) for every streaming source in the streaming query.
committedOffsets works in tandem with the availableOffsets internal registry.
Latest¶
latestOffsets: StreamProgress
StreamExecution creates a StreamProgress to track the latest available offsets (by SparkDataStream) when created.
latestOffsets is updated when:
MicroBatchExecutionis requested to constructNextBatch (after getting the latest available offsets from unique sources)
latestOffsets is used when:
MicroBatchExecutionis requested to runActivatedStream (and recordTriggerOffsets)ContinuousExecutionis requested to commit an epoch (and recordTriggerOffsets)
Fully-Qualified (Resolved) Path to Checkpoint Root Directory¶
resolvedCheckpointRoot: String
resolvedCheckpointRoot is a fully-qualified path of the given checkpoint root directory.
The given checkpoint root directory is defined using checkpointLocation option or the spark.sql.streaming.checkpointLocation configuration property with queryName option.
checkpointLocation and queryName options are defined when StreamingQueryManager is requested to create a streaming query.
resolvedCheckpointRoot is used when creating the path to the checkpoint directory and when StreamExecution finishes running streaming batches.
resolvedCheckpointRoot is used for the logicalPlan (while transforming analyzedPlan and planning StreamingRelation logical operators to corresponding StreamingExecutionRelation physical operators with the streaming data sources created passing in the path to sources directory to store checkpointing metadata).
resolvedCheckpointRoot is printed out immediately when resolved as a INFO message to the logs:
Checkpoint root [checkpointRoot] resolved to [resolvedCheckpointRoot].
resolvedCheckpointRoot is printed out again as a INFO message to the logs when StreamExecution is started:
Starting [prettyIdString]. Use [resolvedCheckpointRoot] to store the query checkpoint.
StreamWriterCommitProgress¶
sinkCommitProgress: Option[StreamWriterCommitProgress]
sinkCommitProgress is part of the ProgressReporter abstraction.
StreamExecution initializes sinkCommitProgress registry to be None when created.
Last Incremental QueryExecution¶
lastExecution: IncrementalExecution
lastExecution is part of the ProgressReporter abstraction.
lastExecution is an IncrementalExecution (a QueryExecution of a streaming query) of the most recent (last) execution.
lastExecution is created (and requested for the "executable" SparkPlan) when the stream execution engines are requested for the following:
MicroBatchExecutionis requested to run a single streaming micro-batch (when in queryPlanning Phase)ContinuousExecutionis requested to run a streaming query (when in queryPlanning Phase)
lastExecution is the QueryExecution of the DataFrame with new data for every microbatch in MicroBatchExecution.
lastExecution is used when:
MicroBatchExecutionis requested to construct or skip the next streaming micro-batch (if there are StateStoreWriters in a streaming query), run a single streaming micro-batch (when in addBatch Phase and updating watermark and committing offsets to offset commit log)StreamExecutionis requested to explain a streaming query (using explainInternal)ContinuousExecutionis requested to run a streaming query (when in runContinuous Phase)- For debugging query execution of a streaming query (
codegenString,codegenStringSeq)
Explaining Streaming Query¶
explain(): Unit // (1)!
explain(extended: Boolean): Unit
- Turns the
extendedflag off (false)
explain simply prints out explainInternal to the standard output.
Stopping Streaming Sources and Readers¶
stopSources(): Unit
stopSources requests every streaming source to stop.
In case of an non-fatal exception, stopSources prints out the following WARN message to the logs:
Failed to stop streaming source: [source]. Resources may have leaked.
stopSources is used when:
-
StreamExecutionis requested to <> (and < > successfully or not) -
ContinuousExecutionis requested to <> (and terminates)
Running Stream Processing¶
runStream(): Unit
runStream simply prepares the environment to execute the activated streaming query.
runStream is used when the stream execution thread is requested to start (when DataStreamWriter is requested to start an execution of the streaming query).
Internally, runStream sets the job group (to all the Spark jobs started by this thread) as follows:
-
runId for the job group ID
-
getBatchDescriptionString for the job group description (to display in web UI)
-
interruptOnCancelflag on
Note
runStream uses the SparkSession to access SparkContext and assign the job group id.
Learn more about SparkContext.setJobGroup method in The Internals of Apache Spark online book.
runStream sets sql.streaming.queryId local property to id.
runStream requests the MetricsSystem to register the MetricsReporter when spark.sql.streaming.metricsEnabled configuration property is enabled.
runStream notifies StreamingQueryListeners that the streaming query has been started (by posting a new QueryStartedEvent event with id, runId, and name).

runStream unblocks the main starting thread (by decrementing the count of the startLatch that when 0 lets the starting thread continue).
runStream updates the status message to be Initializing sources.
runStream initializes the analyzed logical plan.
Lazy Value
The analyzed logical plan is a Scala lazy value to guarantee that the code to initialize it is executed once only (when accessed for the first time) and cached afterwards.
runStream disables adaptive query execution and cost-based join optimization (by turning spark.sql.adaptive.enabled and spark.sql.cbo.enabled configuration properties off, respectively).
runStream creates a new "zero" OffsetSeqMetadata.
(when in INITIALIZING state) runStream enters ACTIVE state:
-
Decrements the count of initializationLatch
Note
runBatches does the main work only when first started (when in INITIALIZING state).
runStream...FIXME (describe the failed and stop states)
Once TriggerExecutor has finished executing batches, runBatches updates the status message to Stopped.
NOTE: TriggerExecutor finishes executing batches when the batch runner returns whether the streaming query is stopped or not (while active).
finally Block¶
runStream releases the startLatch and initializationLatch latches.
runStream stopSources.
runStream enters TERMINATED state.
runStream sets the StreamingQueryStatus with the isTriggerActive and isDataAvailable flags off (false).
runStream removes the stream metrics reporter from the application's MetricsSystem.
runStream requests the StreamingQueryManager to handle termination of a streaming query.
runStream creates a new QueryTerminatedEvent (with the id and run id of the streaming query) and posts it.
With the deleteCheckpointOnStop flag enabled and no StreamingQueryException, runStream deletes the checkpoint directory.
In the end, runStream releases the terminationLatch latch.
TriggerExecutor's Batch Runner¶
Batch Runner (batchRunner) is an executable block executed by TriggerExecutor in runBatches.
batchRunner starts trigger calculation.
As long as the query is not stopped (i.e. state is not TERMINATED), batchRunner executes the streaming batch for the trigger.
In triggerExecution time-tracking section, runBatches branches off per currentBatchId:
-
For
currentBatchId < 0:- populateStartOffsets
- Setting Job Description as getBatchDescriptionString
Stream running from [committedOffsets] to [availableOffsets] -
For
currentBatchId >= 0:
If there is data available in the sources, batchRunner marks currentStatus with isDataAvailable enabled.
Tip
You can check out the status of a streaming query using status method.
scala> spark.streams.active(0).status
res1: org.apache.spark.sql.streaming.StreamingQueryStatus =
{
"message" : "Waiting for next trigger",
"isDataAvailable" : false,
"isTriggerActive" : false
}
batchRunner then updates the status message to Processing new data and runs the current streaming batch.

After triggerExecution section has finished, batchRunner finishes the streaming batch for the trigger (and collects query execution statistics).
When there was <batchRunner updates committed offsets (by adding the <
batchRunner prints out the following DEBUG message to the logs:
batch [currentBatchId] committed
batchRunner increments the current batch id and sets the job description for all the following Spark jobs to include the new batch id.
When no data was available in the sources to process, batchRunner does the following:
-
Marks currentStatus with
isDataAvailabledisabled -
Updates the status message to Waiting for data to arrive
-
Sleeps the current thread for pollingDelayMs milliseconds.
batchRunner updates the status message to Waiting for next trigger and returns whether the query is currently active or not (so TriggerExecutor can decide whether to finish executing the batches or not)
Path to Checkpoint Directory¶
checkpointFile(
name: String): String
checkpointFile gives the path of a directory with name in checkpoint directory.
checkpointFile is used for streamMetadata, OffsetSeqLog, BatchCommitLog, and lastExecution (for runBatch).
Posting StreamingQueryListener Event¶
postEvent(
event: StreamingQueryListener.Event): Unit
postEvent is a part of the ProgressReporter abstraction.
postEvent simply requests the StreamingQueryManager to post the input event (to the StreamingQueryListenerBus in the current SparkSession).
Note
postEvent uses SparkSession to access the current StreamingQueryManager.
postEvent is used when:
-
ProgressReporteris requested to report update progress (while finishing a trigger) -
StreamExecutionruns streaming batches (and announces starting a streaming query by posting a QueryStartedEvent and query termination by posting a QueryTerminatedEvent)
Waiting Until No New Data Available in Sources or Query Has Been Terminated¶
processAllAvailable(): Unit
processAllAvailable is a part of the StreamingQuery abstraction.
processAllAvailable reports the <
NOTE: <StreamExecution is requested to <
processAllAvailable returns immediately when StreamExecution is no longer <TERMINATED state).
processAllAvailable acquires a lock on the <false).
processAllAvailable keeps polling with 10-second pauses (locked on <true) or StreamExecution is no longer <TERMINATED state).
NOTE: The 10-second pause is hardcoded and cannot be changed.
In the end, processAllAvailable releases <
processAllAvailable throws an IllegalStateException when executed on the <
Cannot wait for a query state from the same thread that is running the query
Stream Execution Thread¶
queryExecutionThread: QueryExecutionThread
queryExecutionThread is a Java thread of execution (java.util.Thread) that runs a streaming query.
queryExecutionThread is started (as a daemon thread) when StreamExecution is requested to <start prints out the following INFO message to the logs (with the <
Starting [prettyIdString]. Use [resolvedCheckpointRoot] to store the query checkpoint.
When started, queryExecutionThread sets the <
queryExecutionThread uses the name stream execution thread for [id] (that uses <queryName [id = [id], runId = [runId]]).
queryExecutionThread is a QueryExecutionThread that is a custom UninterruptibleThread from Apache Spark with runUninterruptibly method for running a block of code without being interrupted by Thread.interrupt().
Tip
Use Java's jconsole or jstack to monitor stream execution threads.
$ jstack <driver-pid> | grep -e "stream execution thread"
"stream execution thread for kafka-topic1 [id =...
Batch Metadata¶
offsetSeqMetadata: OffsetSeqMetadata
offsetSeqMetadata is part of the ProgressReporter abstraction.
StreamExecution creates an OffsetSeqMetadata when created.
Most importantly, the OffsetSeqMetadata is used to create an IncrementalExecution in the queryPlanning phase of the MicroBatchExecution and ContinuousExecution execution engines.
offsetSeqMetadata is initialized (with 0 for batchWatermarkMs and batchTimestampMs) when StreamExecution is requested to <
offsetSeqMetadata is then updated (with the current event-time watermark and timestamp) when MicroBatchExecution is requested to <
NOTE: MicroBatchExecution uses the <
offsetSeqMetadata is stored (checkpointed) in <MicroBatchExecution (and printed out as INFO message to the logs).
offsetSeqMetadata is restored (re-created) from a checkpointed state when MicroBatchExecution is requested to <
offsetSeqMetadata is part of the ProgressReporter abstraction.
isActive¶
isActive: Boolean
isActive is part of the StreamingQuery abstraction.
isActive is enabled (true) as long as the State is not TERMINATED.
Human-Readable HTML Description of Spark Jobs (for web UI)¶
getBatchDescriptionString: String
getBatchDescriptionString is a human-readable description (in HTML format) that uses the optional name if defined, the <batchDescription that can be init (for the <
getBatchDescriptionString is of the following format:
[name]
id = [id]
runId = [runId]
batch = [batchDescription]

getBatchDescriptionString is used when:
-
MicroBatchExecutionstream execution engine is requested to <> (as the job description of any Spark jobs triggerred as part of query execution) -
StreamExecutionis requested to <> (as the job group description of any Spark jobs triggerred as part of query execution)
No New Data Available¶
noNewData: Boolean
noNewData is a flag that indicates that a batch has completed with no new data left and processAllAvailable could stop waiting till all streaming data is processed.
Default: false
Turned on (true) when:
-
MicroBatchExecutionstream execution engine is requested to <> (while < >) -
ContinuousExecutionstream execution engine is requested to <>
Turned off (false) when:
-
MicroBatchExecutionstream execution engine is requested to <> (right after the < > phase) -
StreamExecutionis requested to <>
Current Batch ID¶
-
-1whenStreamExecutionis created -
0whenStreamExecutionpopulates start offsets (and OffsetSeqLog is empty, i.e. no offset files inoffsetsdirectory in checkpoint) -
Incremented when
StreamExecutionruns streaming batches and finishes a trigger that had data available from sources (right after committing the batch).
newData Registry¶
newData: Map[BaseStreamingSource, LogicalPlan]
Registry of the streaming sources (in the logical query plan) that have new data available in the current batch. The new data is a streaming DataFrame.
newData is part of the ProgressReporter abstraction.
Set when StreamExecution is requested to requests unprocessed data from streaming sources (while running a single streaming batch)
MetricsReporter¶
StreamExecution defines a MetricsReporter to report streaming metrics.
MetricsReporter is created with the following source name (with name if defined or id):
spark.streaming.[name or id]
MetricsReporter is registered only when spark.sql.streaming.metricsEnabled configuration property is enabled (when StreamExecution is requested to run).
MetricsReporter is deactivated (removed) when a streaming query is stopped (when StreamExecution is requested to runStream).
Latches¶
StreamExecution uses java.util.concurrent.CountDownLatches (with count 1).
initializationLatch¶
Counted down when requested to runStream:
- Changes state from INITIALIZING to ACTIVE just before runActivatedStream
- In runStream's finally block
Awaited for tests only (which seems to indicate that it is a test-only latch)
startLatch¶
Counted down when requested to runStream:
Awaited when requested to start (to pause the main thread until StreamExecution was requested to run the streaming query on a separate thread)
terminationLatch¶
Counted down at the end of runStream
Awaited when requested to awaitTermination (that pauses the thread until the streaming query has finished successfully or not).
Locks¶
awaitProgressLock¶
StreamExecution uses a fair reentrant mutual exclusion java.util.concurrent.locks.ReentrantLock (that favors granting access to the longest-waiting thread under contention)
__is_continuous_processing Local Property¶
StreamExecution uses __is_continuous_processing local property (default: false) to differentiate between <true) and <false) which is used when StateStoreRDD is requested to compute a partition (and finds a StateStore for a given version).
Demo¶
import org.apache.spark.sql.streaming.StreamingQuery
assert(sq.isInstanceOf[StreamingQuery])
import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper
val se = sq.asInstanceOf[StreamingQueryWrapper].streamingQuery
scala> :type se
org.apache.spark.sql.execution.streaming.StreamExecution
Logging¶
Enable ALL logging level for org.apache.spark.sql.execution.streaming.StreamExecution logger to see what happens inside.
Add the following line to conf/log4j.properties:
log4j.logger.org.apache.spark.sql.execution.streaming.StreamExecution=ALL
Refer to Logging.