StreamExecution¶
StreamExecution
is an abstraction of stream execution engines (streaming query processing engines) that can run a structured query (on a stream execution thread).
Note
Continuous query, streaming query, continuous Dataset, streaming Dataset are all considered high-level synonyms for an executable entity that stream execution engines run using the analyzed logical plan internally.
Important
StreamExecution
does not support adaptive query execution and cost-based optimizer (and turns them off when requested to run stream processing).
StreamExecution
is the execution environment of a streaming query that is executed every trigger and in the end adds the results to a sink.
StreamExecution
corresponds to a single streaming query with one or more streaming sources and exactly one streaming sink.
import org.apache.spark.sql.streaming.Trigger
import scala.concurrent.duration._
val q = spark.
readStream.
format("rate").
load.
writeStream.
format("console").
trigger(Trigger.ProcessingTime(10.minutes)).
start
scala> :type q
org.apache.spark.sql.streaming.StreamingQuery
// Pull out StreamExecution off StreamingQueryWrapper
import org.apache.spark.sql.execution.streaming.{StreamExecution, StreamingQueryWrapper}
val se = q.asInstanceOf[StreamingQueryWrapper].streamingQuery
scala> :type se
org.apache.spark.sql.execution.streaming.StreamExecution
Contract¶
Logical Plan¶
logicalPlan: LogicalPlan
Analyzed logical plan of the streaming query to execute
Used when StreamExecution
is requested to run stream processing
logicalPlan
is part of the ProgressReporter abstraction.
Running Activated Streaming Query¶
runActivatedStream(
sparkSessionForStream: SparkSession): Unit
Executes (runs) the activated streaming query (that is described by the logical plan)
Used when StreamExecution
is requested to run the streaming query (when transitioning from INITIALIZING
to ACTIVE
state)
Implementations¶
Creating Instance¶
StreamExecution
takes the following to be created:
-
SparkSession
- Name of the streaming query (can be
null
) - Path of the checkpoint directory (metadata directory)
- Streaming query (not used due to logicalPlan)
-
Table
(Spark SQL) - Trigger
-
Clock
- OutputMode
-
deleteCheckpointOnStop
flag (whether to delete the checkpoint directory on stop)
Abstract Class
StreamExecution
 is an abstract class and cannot be created directly. It is created indirectly for the concrete StreamExecutions.
Demo¶
import org.apache.spark.sql.streaming.StreamingQuery
assert(sq.isInstanceOf[StreamingQuery])
import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper
val se = sq.asInstanceOf[StreamingQueryWrapper].streamingQuery
scala> :type se
org.apache.spark.sql.execution.streaming.StreamExecution
Configuration Properties¶
s.s.s.minBatchesToRetain¶
StreamExecution
uses the spark.sql.streaming.minBatchesToRetain configuration property to allow the StreamExecutions to discard old log entries (from the offset and commit logs).
s.s.s.pollingDelay¶
StreamExecution
uses spark.sql.streaming.pollingDelay configuration property to control how long to delay polling for new data (when no data was available to process in a batch).
ProgressReporter¶
StreamExecution
is a ProgressReporter and reports status of the streaming query (when it starts, progresses and terminates) by posting StreamingQueryListener
events.
import org.apache.spark.sql.streaming.Trigger
import scala.concurrent.duration._
val sq = spark
.readStream
.text("server-logs")
.writeStream
.format("console")
.queryName("debug")
.trigger(Trigger.ProcessingTime(20.seconds))
.start
// Enable the log level to see the INFO and DEBUG messages
// log4j.logger.org.apache.spark.sql.execution.streaming.StreamExecution=DEBUG
17/06/18 21:21:07 INFO StreamExecution: Starting new streaming query.
17/06/18 21:21:07 DEBUG StreamExecution: getOffset took 5 ms
17/06/18 21:21:07 DEBUG StreamExecution: Stream running from {} to {}
17/06/18 21:21:07 DEBUG StreamExecution: triggerExecution took 9 ms
17/06/18 21:21:07 DEBUG StreamExecution: Execution stats: ExecutionStats(Map(),List(),Map())
17/06/18 21:21:07 INFO StreamExecution: Streaming query made progress: {
"id" : "8b57b0bd-fc4a-42eb-81a3-777d7ba5e370",
"runId" : "920b227e-6d02-4a03-a271-c62120258cea",
"name" : "debug",
"timestamp" : "2017-06-18T19:21:07.693Z",
"numInputRows" : 0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"getOffset" : 5,
"triggerExecution" : 9
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "FileStreamSource[file:/Users/jacek/dev/oss/spark/server-logs]",
"startOffset" : null,
"endOffset" : null,
"numInputRows" : 0,
"processedRowsPerSecond" : 0.0
} ],
"sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@2460208a"
}
}
17/06/18 21:21:10 DEBUG StreamExecution: Starting Trigger Calculation
17/06/18 21:21:10 DEBUG StreamExecution: getOffset took 3 ms
17/06/18 21:21:10 DEBUG StreamExecution: triggerExecution took 3 ms
17/06/18 21:21:10 DEBUG StreamExecution: Execution stats: ExecutionStats(Map(),List(),Map())
Unique Streaming Sources¶
uniqueSources: Map[SparkDataStream, ReadLimit]
StreamExecution
tracks the unique streaming data sources of a streaming query in the uniqueSources
internal registry.
Used when StreamExecution
:
- Constructs the next streaming micro-batch (and gets new offsets for every streaming data source)
- Stops all streaming data sources
Streaming Query Identifiers¶
The name, id and runId are all unique across all active queries (in a StreamingQueryManager). The difference is that:
-
name is optional and user-defined
-
id is a UUID that is auto-generated at the time
StreamExecution
is created and persisted tometadata
checkpoint file -
runId is a UUID that is auto-generated every time
StreamExecution
is created
Id¶
StreamExecution
is uniquely identified by an ID of the streaming query (which is the id
of the StreamMetadata).
Since the StreamMetadata is persisted (to the metadata
file in the checkpoint directory), the streaming query ID "survives" query restarts as long as the checkpoint directory is preserved.
Run Id¶
StreamExecution
is uniquely identified by a run ID of the streaming query. A run ID is a randomly-generated 128-bit universally unique identifier (UUID) that is assigned at the time StreamExecution
is created.
runId
does not "survive" query restarts and will always be different yet unique (across all active queries).
StreamMetadata¶
StreamExecution
uses a StreamMetadata that is persisted in the metadata
file in the checkpoint directory.
If the metadata
file is available it is read and is the way to recover the id of a streaming query when resumed (i.e. restarted after a failure or a planned stop).
Metadata Logs¶
Write-Ahead Offset Log¶
offsetLog: OffsetSeqLog
offsetLog
is a Hadoop DFS-based metadata storage (of OffsetSeqs) with offsets
metadata directory.
offsetLog
is used as a Write-Ahead Log of Offsets to persist offsets of the data about to be processed in every trigger.
Tip
Monitor offsets
and commits
metadata logs to know the progress of a streaming query.
The number of entries in the OffsetSeqLog
is controlled using spark.sql.streaming.minBatchesToRetain configuration property.
offsetLog
is used when:
-
ContinuousExecution
stream execution engine is requested to commit an epoch, getStartOffsets, and addOffset -
MicroBatchExecution
stream execution engine is requested to populate start offsets and construct (or skip) the next streaming micro-batch
Offset Commit Log¶
StreamExecution
uses offset commit log (CommitLog with commits
metadata checkpoint directory) for streaming batches successfully executed (with a single file per batch with a file name being the batch id) or committed epochs.
Note
Metadata log or metadata checkpoint are synonyms and are often used interchangeably.
commitLog
is used by the <
-
MicroBatchExecution
is requested to <> (that in turn requests to < > at the very beginning of the streaming query execution and later regularly every < >) -
ContinuousExecution
is requested to <> (that in turn requests to < > at the very beginning of the streaming query execution and later regularly every < >)
State of Streaming Query¶
state: AtomicReference[State]
state
indicates the internal state of execution of the streaming query (as java.util.concurrent.atomic.AtomicReference).
ACTIVE¶
StreamExecution
has been requested to <
INITIALIZING¶
StreamExecution
has been created.
TERMINATED¶
Indicates that:
MicroBatchExecution
has been requested to stopContinuousExecution
has been requested to stopStreamExecution
has been requested to run stream processing (and has finished running the activated streaming query)
RECONFIGURING¶
Used when ContinuousExecution
is requested to run a streaming query in continuous mode (and the ContinuousReader indicated a need for reconfiguration)
Creating StreamingWrite¶
createStreamingWrite(
table: SupportsWrite,
options: Map[String, String],
inputPlan: LogicalPlan): StreamingWrite
createStreamingWrite
creates a LogicalWriteInfoImpl
(with the query ID, the schema of the input LogicalPlan
and the given options).
createStreamingWrite
requests the given SupportsWrite
table for a WriteBuilder
(for the LogicalWriteInfoImpl
).
Tip
Learn more about SupportsWrite and WriteBuilder in The Internals of Spark SQL online book.
createStreamingWrite
branches based on the OutputMode:
-
For Append output mode,
createStreamingWrite
requests theWriteBuilder
to build aStreamingWrite
. -
For Complete output mode,
createStreamingWrite
assumes that theWriteBuilder
is aSupportsTruncate
and requests it totruncate
followed bybuildForStreaming
-
For Update output mode,
createStreamingWrite
assumes that theWriteBuilder
is aSupportsStreamingUpdate
and requests it toupdate
followed bybuildForStreaming
Tip
Learn more about SupportsTruncate and SupportsStreamingUpdate in The Internals of Spark SQL online book.
createStreamingWrite
is used when MicroBatchExecution and ContinuousExecution stream execution engines are requested for analyzed logical plans.
Available Offsets (StreamProgress)¶
availableOffsets: StreamProgress
availableOffsets
is a registry of offsets per streaming source to track what data (by offset) is available for processing for every streaming source in the streaming query (and have not yet been committed).
availableOffsets
works in tandem with the committedOffsets internal registry.
availableOffsets
is empty when StreamExecution
is created (i.e. no offsets are reported for any streaming source in the streaming query).
availableOffsets
is used when:
-
MicroBatchExecution
stream execution engine is requested to <>, < >, < > and < > -
ContinuousExecution
stream execution engine is requested to commit an epoch -
StreamExecution
is requested for the internal string representation
Committed Offsets (StreamProgress)¶
committedOffsets: StreamProgress
committedOffsets
is a registry of offsets per streaming source to track what data (by offset) has already been processed and committed (to the sink or state stores) for every streaming source in the streaming query.
committedOffsets
works in tandem with the availableOffsets internal registry.
committedOffsets
is used when:
MicroBatchExecution
stream execution engine is requested for the <>, to < > and < > ContinuousExecution
stream execution engine is requested for the <> and to < > StreamExecution
is requested for the internal string representation
Fully-Qualified (Resolved) Path to Checkpoint Root Directory¶
resolvedCheckpointRoot: String
resolvedCheckpointRoot
is a fully-qualified path of the given checkpoint root directory.
The given checkpoint root directory is defined using checkpointLocation option or the spark.sql.streaming.checkpointLocation configuration property with queryName
option.
checkpointLocation
and queryName
options are defined when StreamingQueryManager
is requested to create a streaming query.
resolvedCheckpointRoot
is used when creating the path to the checkpoint directory and when StreamExecution
finishes running streaming batches.
resolvedCheckpointRoot
is used for the logicalPlan (while transforming analyzedPlan and planning StreamingRelation
logical operators to corresponding StreamingExecutionRelation
physical operators with the streaming data sources created passing in the path to sources
directory to store checkpointing metadata).
resolvedCheckpointRoot
is printed out immediately when resolved as a INFO message to the logs:
Checkpoint root [checkpointRoot] resolved to [resolvedCheckpointRoot].
resolvedCheckpointRoot
is printed out again as a INFO message to the logs when StreamExecution
is started:
Starting [prettyIdString]. Use [resolvedCheckpointRoot] to store the query checkpoint.
StreamWriterCommitProgress¶
sinkCommitProgress: Option[StreamWriterCommitProgress]
sinkCommitProgress
is part of the ProgressReporter abstraction.
StreamExecution
initializes sinkCommitProgress
registry to be None
when created.
Last Query Execution Of Streaming Query (IncrementalExecution)¶
lastExecution: IncrementalExecution
lastExecution
is part of the ProgressReporter abstraction.
lastExecution
is a IncrementalExecution (a QueryExecution
of a streaming query) of the most recent (last) execution.
lastExecution
is created when the <
-
MicroBatchExecution
is requested to <> (when in < >) -
ContinuousExecution
stream execution engine is requested to <> (when in < >)
lastExecution
is used when:
-
StreamExecution
is requested to <> (via < >) -
ProgressReporter
is requested to extractStateOperatorMetrics, extractExecutionStats, and extractSourceToNumInputRows -
MicroBatchExecution
stream execution engine is requested to <> (based on StateStoreWriters in a streaming query), < > (when in < > and < >) -
ContinuousExecution
stream execution engine is requested to <> (when in < >) -
For debugging query execution of streaming queries (using
debugCodegen
)
Explaining Streaming Query¶
explain(): Unit // <1>
explain(extended: Boolean): Unit
extended
flag off (false
) explain
simply prints out <
Stopping Streaming Sources and Readers¶
stopSources(): Unit
stopSources
requests every streaming source to stop.
In case of an non-fatal exception, stopSources
prints out the following WARN message to the logs:
Failed to stop streaming source: [source]. Resources may have leaked.
stopSources
is used when:
-
StreamExecution
is requested to <> (and < > successfully or not) -
ContinuousExecution
is requested to <> (and terminates)
Running Stream Processing¶
runStream(): Unit
runStream
simply prepares the environment to execute the activated streaming query.
runStream
is used when the stream execution thread is requested to start (when DataStreamWriter
is requested to start an execution of the streaming query).
Internally, runStream
sets the job group (to all the Spark jobs started by this thread) as follows:
-
runId for the job group ID
-
getBatchDescriptionString for the job group description (to display in web UI)
-
interruptOnCancel
flag on
Note
runStream
uses the SparkSession to access SparkContext
and assign the job group id.
Learn more about SparkContext.setJobGroup method in The Internals of Apache Spark online book.
runStream
sets sql.streaming.queryId
local property to id.
runStream
requests the MetricsSystem
to register the MetricsReporter when spark.sql.streaming.metricsEnabled configuration property is enabled.
runStream
notifies StreamingQueryListeners that the streaming query has been started (by posting a new QueryStartedEvent event with id, runId, and name).
runStream
unblocks the main starting thread (by decrementing the count of the startLatch that when 0
lets the starting thread continue).
runStream
updates the status message to be Initializing sources.
runStream
initializes the analyzed logical plan.
Lazy Value
The analyzed logical plan is a Scala lazy value to guarantee that the code to initialize it is executed once only (when accessed for the first time) and cached afterwards.
runStream
disables adaptive query execution and cost-based join optimization (by turning spark.sql.adaptive.enabled
and spark.sql.cbo.enabled
configuration properties off, respectively).
runStream
creates a new "zero" OffsetSeqMetadata.
(when in INITIALIZING state) runStream
enters ACTIVE state:
-
Decrements the count of initializationLatch
Note
runBatches
does the main work only when first started (when in INITIALIZING state).
runStream
...FIXME (describe the failed and stop states)
Once TriggerExecutor has finished executing batches, runBatches
updates the status message to Stopped.
NOTE: TriggerExecutor finishes executing batches when the batch runner returns whether the streaming query is stopped or not (while active).
finally Block¶
runStream
releases the startLatch and initializationLatch latches.
runStream
stopSources.
runStream
enters TERMINATED state.
runStream
sets the StreamingQueryStatus with the isTriggerActive
and isDataAvailable
flags off (false
).
runStream
removes the stream metrics reporter from the application's MetricsSystem
.
runStream
requests the StreamingQueryManager
to handle termination of a streaming query.
runStream
creates a new QueryTerminatedEvent (with the id and run id of the streaming query) and posts it.
With the deleteCheckpointOnStop flag enabled and no StreamingQueryException
, runStream
deletes the checkpoint directory.
In the end, runStream
releases the terminationLatch latch.
TriggerExecutor's Batch Runner¶
Batch Runner (batchRunner
) is an executable block executed by TriggerExecutor in runBatches.
batchRunner
starts trigger calculation.
As long as the query is not stopped (i.e. state is not TERMINATED), batchRunner
executes the streaming batch for the trigger.
In triggerExecution time-tracking section, runBatches
branches off per currentBatchId:
-
For
currentBatchId < 0
:- populateStartOffsets
- Setting Job Description as getBatchDescriptionString
Stream running from [committedOffsets] to [availableOffsets]
-
For
currentBatchId >= 0
:
If there is data available in the sources, batchRunner
marks currentStatus with isDataAvailable
enabled.
Tip
You can check out the status of a streaming query using status method.
scala> spark.streams.active(0).status
res1: org.apache.spark.sql.streaming.StreamingQueryStatus =
{
"message" : "Waiting for next trigger",
"isDataAvailable" : false,
"isTriggerActive" : false
}
batchRunner
then updates the status message to Processing new data and runs the current streaming batch.
After triggerExecution section has finished, batchRunner
finishes the streaming batch for the trigger (and collects query execution statistics).
When there was <batchRunner
updates committed offsets (by adding the <
batchRunner
prints out the following DEBUG message to the logs:
batch [currentBatchId] committed
batchRunner
increments the current batch id and sets the job description for all the following Spark jobs to include the new batch id.
When no data was available in the sources to process, batchRunner
does the following:
-
Marks currentStatus with
isDataAvailable
disabled -
Updates the status message to Waiting for data to arrive
-
Sleeps the current thread for pollingDelayMs milliseconds.
batchRunner
updates the status message to Waiting for next trigger and returns whether the query is currently active or not (so TriggerExecutor can decide whether to finish executing the batches or not)
Starting Streaming Query (on Stream Execution Thread)¶
start(): Unit
start
starts a stream execution thread that simply runs stream processing (and hence the streaming query).
start
prints out the following INFO message to the logs:
Starting [prettyIdString]. Use [resolvedCheckpointRoot] to store the query checkpoint.
start
then starts the <
NOTE: start
uses Java's ++https://docs.oracle.com/javase/8/docs/api/java/lang/Thread.html#start--++[java.lang.Thread.start] to run the streaming query on a separate execution thread.
NOTE: When started, a streaming query runs in its own execution thread on JVM.
In the end, start
pauses the main thread (using the <StreamExecution
is requested to <
start
is used when StreamingQueryManager
is requested to start a streaming query (when DataStreamWriter
is requested to start an execution of the streaming query).
Path to Checkpoint Directory¶
checkpointFile(
name: String): String
checkpointFile
gives the path of a directory with name
in checkpoint directory.
checkpointFile
is used for streamMetadata, OffsetSeqLog, BatchCommitLog, and lastExecution (for runBatch).
Posting StreamingQueryListener Event¶
postEvent(
event: StreamingQueryListener.Event): Unit
postEvent
is a part of the ProgressReporter abstraction.
postEvent
simply requests the StreamingQueryManager
to post the input event (to the StreamingQueryListenerBus in the current SparkSession
).
Note
postEvent
uses SparkSession
to access the current StreamingQueryManager
.
postEvent
is used when:
-
ProgressReporter
is requested to report update progress (while finishing a trigger) -
StreamExecution
runs streaming batches (and announces starting a streaming query by posting a QueryStartedEvent and query termination by posting a QueryTerminatedEvent)
Waiting Until No New Data Available in Sources or Query Has Been Terminated¶
processAllAvailable(): Unit
processAllAvailable
is a part of the StreamingQuery abstraction.
processAllAvailable
reports the <
NOTE: <StreamExecution
is requested to <
processAllAvailable
returns immediately when StreamExecution
is no longer <TERMINATED
state).
processAllAvailable
acquires a lock on the <false
).
processAllAvailable
keeps polling with 10-second pauses (locked on <true
) or StreamExecution
is no longer <TERMINATED
state).
NOTE: The 10-second pause is hardcoded and cannot be changed.
In the end, processAllAvailable
releases <
processAllAvailable
throws an IllegalStateException
when executed on the <
Cannot wait for a query state from the same thread that is running the query
Stream Execution Thread¶
queryExecutionThread: QueryExecutionThread
queryExecutionThread
is a Java thread of execution (java.util.Thread) that runs a streaming query.
queryExecutionThread
is started (as a daemon thread) when StreamExecution
is requested to <start
prints out the following INFO message to the logs (with the <
Starting [prettyIdString]. Use [resolvedCheckpointRoot] to store the query checkpoint.
When started, queryExecutionThread
sets the <
queryExecutionThread
uses the name stream execution thread for [id] (that uses <queryName [id = [id], runId = [runId]]
).
queryExecutionThread
is a QueryExecutionThread
that is a custom UninterruptibleThread
from Apache Spark with runUninterruptibly
method for running a block of code without being interrupted by Thread.interrupt()
.
Tip
Use Java's jconsole or jstack to monitor stream execution threads.
$ jstack <driver-pid> | grep -e "stream execution thread"
"stream execution thread for kafka-topic1 [id =...
Current Batch Metadata (Event-Time Watermark and Timestamp)¶
offsetSeqMetadata: OffsetSeqMetadata
offsetSeqMetadata
is a OffsetSeqMetadata.
offsetSeqMetadata
is used to create an IncrementalExecution in the queryPlanning phase of the MicroBatchExecution and ContinuousExecution execution engines.
offsetSeqMetadata
is initialized (with 0
for batchWatermarkMs
and batchTimestampMs
) when StreamExecution
is requested to <
offsetSeqMetadata
is then updated (with the current event-time watermark and timestamp) when MicroBatchExecution
is requested to <
NOTE: MicroBatchExecution
uses the <
offsetSeqMetadata
is stored (checkpointed) in <MicroBatchExecution
(and printed out as INFO message to the logs).
offsetSeqMetadata
is restored (re-created) from a checkpointed state when MicroBatchExecution
is requested to <
offsetSeqMetadata
is part of the ProgressReporter abstraction.
isActive¶
isActive: Boolean
isActive
is part of the StreamingQuery abstraction.
isActive
is enabled (true
) as long as the State is not TERMINATED.
Human-Readable HTML Description of Spark Jobs (for web UI)¶
getBatchDescriptionString: String
getBatchDescriptionString
is a human-readable description (in HTML format) that uses the optional name if defined, the <batchDescription
that can be init (for the <
getBatchDescriptionString
is of the following format:
[name]
id = [id]
runId = [runId]
batch = [batchDescription]
getBatchDescriptionString
is used when:
-
MicroBatchExecution
stream execution engine is requested to <> (as the job description of any Spark jobs triggerred as part of query execution) -
StreamExecution
is requested to <> (as the job group description of any Spark jobs triggerred as part of query execution)
No New Data Available¶
noNewData: Boolean
noNewData
is a flag that indicates that a batch has completed with no new data left and processAllAvailable could stop waiting till all streaming data is processed.
Default: false
Turned on (true
) when:
-
MicroBatchExecution
stream execution engine is requested to <> (while < >) -
ContinuousExecution
stream execution engine is requested to <>
Turned off (false
) when:
-
MicroBatchExecution
stream execution engine is requested to <> (right after the < > phase) -
StreamExecution
is requested to <>
Current Batch ID¶
-
-1
whenStreamExecution
is created -
0
whenStreamExecution
populates start offsets (and OffsetSeqLog is empty, i.e. no offset files inoffsets
directory in checkpoint) -
Incremented when
StreamExecution
runs streaming batches and finishes a trigger that had data available from sources (right after committing the batch).
newData Registry¶
newData: Map[BaseStreamingSource, LogicalPlan]
Registry of the streaming sources (in the logical query plan) that have new data available in the current batch. The new data is a streaming DataFrame
.
newData
is part of the ProgressReporter abstraction.
Set when StreamExecution
is requested to requests unprocessed data from streaming sources (while running a single streaming batch)
Used when StreamExecution
is requested to transform the logical plan (of the streaming query) to include the Sources and the MicroBatchReaders with new data (while running a single streaming batch)
Streaming Metrics¶
StreamExecution
uses MetricsReporter for reporting streaming metrics.
MetricsReporter
is created with the following source name (with name if defined or id):
spark.streaming.[name or id]
MetricsReporter
is registered only when spark.sql.streaming.metricsEnabled configuration property is enabled (when StreamExecution
is requested to runStream).
MetricsReporter
is deactivated (removed) when a streaming query is stopped (when StreamExecution
is requested to runStream).
Latches¶
StreamExecution
uses java.util.concurrent.CountDownLatches (with count 1
).
initializationLatch¶
Counted down when requested to runStream:
- Changes state from INITIALIZING to ACTIVE just before runActivatedStream
- In runStream's finally block
Awaited for tests only (which seems to indicate that it is a test-only latch)
startLatch¶
Counted down when requested to runStream:
Awaited when requested to start (to pause the main thread until StreamExecution
was requested to run the streaming query on a separate thread)
terminationLatch¶
Counted down at the end of runStream
Awaited when requested to awaitTermination (that pauses the thread until the streaming query has finished successfully or not).
Locks¶
awaitProgressLock¶
StreamExecution
uses a fair reentrant mutual exclusion java.util.concurrent.locks.ReentrantLock (that favors granting access to the longest-waiting thread under contention)
__is_continuous_processing Local Property¶
StreamExecution
uses __is_continuous_processing local property (default: false
) to differentiate between <true
) and <false
) which is used when StateStoreRDD
is requested to compute a partition (and finds a StateStore for a given version).
Logging¶
Enable ALL
logging level for org.apache.spark.sql.execution.streaming.StreamExecution
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.sql.execution.streaming.StreamExecution=ALL
Refer to Logging.