StreamExecution¶
StreamExecution
is an abstraction of stream execution engines (streaming query processing engines) that run a structured query (on a stream execution thread).
Note
Continuous query, streaming query, continuous Dataset, streaming Dataset are all considered high-level synonyms for an executable entity that stream execution engines run using the analyzed logical plan internally.
Important
StreamExecution
does not support adaptive query execution and cost-based optimizer (and turns them off when requested to run stream processing).
StreamExecution
is the execution environment of a streaming query that is executed every trigger and in the end adds the results to a sink.
StreamExecution
corresponds to a single streaming query with one or more streaming sources and exactly one streaming sink.
import org.apache.spark.sql.streaming.Trigger
import scala.concurrent.duration._
val q = spark.
readStream.
format("rate").
load.
writeStream.
format("console").
trigger(Trigger.ProcessingTime(10.minutes)).
start
scala> :type q
org.apache.spark.sql.streaming.StreamingQuery
// Pull out StreamExecution off StreamingQueryWrapper
import org.apache.spark.sql.execution.streaming.{StreamExecution, StreamingQueryWrapper}
val se = q.asInstanceOf[StreamingQueryWrapper].streamingQuery
scala> :type se
org.apache.spark.sql.execution.streaming.StreamExecution
Contract¶
Logical Plan¶
logicalPlan: LogicalPlan
logicalPlan
is part of the ProgressReporter abstraction.
Analyzed logical plan of the streaming query to execute
Note
No idea why logicalPlan
is part of StreamExecution
since there's (almost) no change from the parent ProgressReporter except the access qualifier (from protected
to public
).
Could this be the only reason?
Used when:
StreamExecution
is requested to run stream processing
Running Activated Streaming Query¶
runActivatedStream(
sparkSessionForStream: SparkSession): Unit
Executes (runs) the activated streaming query (that is described by the logical plan)
Used when:
StreamExecution
is requested to run the streaming query (when transitioning fromINITIALIZING
toACTIVE
state)
Implementations¶
Creating Instance¶
StreamExecution
takes the following to be created:
-
SparkSession
(Spark SQL) - Name of the streaming query (can be
null
) - Path of the checkpoint directory (metadata directory)
- Streaming query (not used due to logicalPlan)
- Sink
- Trigger
-
Clock
- OutputMode
-
deleteCheckpointOnStop
flag (whether to delete the checkpoint directory on stop)
Abstract Class
StreamExecution
 is an abstract class and cannot be created directly. It is created indirectly for the concrete StreamExecutions.
Sink¶
sink: Table
StreamExecution
is given a Table
(Spark SQL) when created.
The Table
represents the sink this streaming query writes to.
sink
is part of the ProgressReporter abstraction.
Starting Streaming Query¶
start(): Unit
start
starts a stream execution thread to run stream processing.
start
prints out the following INFO message to the logs (with the prettyId and the checkpointRoot):
Starting [prettyId]. Use [checkpointRoot] to store the query checkpoint.
start
then starts the stream execution thread as a daemon thread (in its own execution thread on JVM).
Note
start
uses Java's Thread.start to run the streaming query on a separate execution thread.
In the end, start
pauses the main thread (using the latch) until StreamExecution
is requested to run the streaming query (that sends a QueryStartedEvent to all streaming listeners followed by decrementing the count of the startLatch).
start
is used when:
StreamingQueryManager
is requested to start a streaming query (whenDataStreamWriter
is requested to start an execution of the streaming query)
Configuration Properties¶
s.s.s.minBatchesToRetain¶
StreamExecution
uses the spark.sql.streaming.minBatchesToRetain configuration property to allow the StreamExecutions to discard old log entries (from the offset and commit logs).
s.s.s.pollingDelay¶
StreamExecution
uses spark.sql.streaming.pollingDelay configuration property to control how long to delay polling for new data (when no data was available to process in a batch).
ProgressReporter¶
StreamExecution
is a ProgressReporter and reports status of the streaming query (when it starts, progresses and terminates) by posting StreamingQueryListener
events.
import org.apache.spark.sql.streaming.Trigger
import scala.concurrent.duration._
val sq = spark
.readStream
.text("server-logs")
.writeStream
.format("console")
.queryName("debug")
.trigger(Trigger.ProcessingTime(20.seconds))
.start
// Enable the log level to see the INFO and DEBUG messages
// log4j.logger.org.apache.spark.sql.execution.streaming.StreamExecution=DEBUG
17/06/18 21:21:07 INFO StreamExecution: Starting new streaming query.
17/06/18 21:21:07 DEBUG StreamExecution: getOffset took 5 ms
17/06/18 21:21:07 DEBUG StreamExecution: Stream running from {} to {}
17/06/18 21:21:07 DEBUG StreamExecution: triggerExecution took 9 ms
17/06/18 21:21:07 DEBUG StreamExecution: Execution stats: ExecutionStats(Map(),List(),Map())
17/06/18 21:21:07 INFO StreamExecution: Streaming query made progress: {
"id" : "8b57b0bd-fc4a-42eb-81a3-777d7ba5e370",
"runId" : "920b227e-6d02-4a03-a271-c62120258cea",
"name" : "debug",
"timestamp" : "2017-06-18T19:21:07.693Z",
"numInputRows" : 0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"getOffset" : 5,
"triggerExecution" : 9
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "FileStreamSource[file:/Users/jacek/dev/oss/spark/server-logs]",
"startOffset" : null,
"endOffset" : null,
"numInputRows" : 0,
"processedRowsPerSecond" : 0.0
} ],
"sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@2460208a"
}
}
17/06/18 21:21:10 DEBUG StreamExecution: Starting Trigger Calculation
17/06/18 21:21:10 DEBUG StreamExecution: getOffset took 3 ms
17/06/18 21:21:10 DEBUG StreamExecution: triggerExecution took 3 ms
17/06/18 21:21:10 DEBUG StreamExecution: Execution stats: ExecutionStats(Map(),List(),Map())
Unique Data Streams¶
uniqueSources: Map[SparkDataStream, ReadLimit]
StreamExecution
defines uniqueSources
internal registry for the unique SparkDataStreams of a streaming query with their ReadLimits.
uniqueSources
registry is initialized when the stream execution engines are requested for the analyzed logical plan:
uniqueSources
is used when:
MicroBatchExecution
is requested to construct next micro-batch (and gets offsets of data streams)
Used when StreamExecution
is requested to stop all streaming data streams
Streaming Query Identifiers¶
The name, id and runId are all unique across all active queries (in a StreamingQueryManager). The difference is that:
-
name is optional and user-defined
-
id is a UUID that is auto-generated at the time
StreamExecution
is created and persisted tometadata
checkpoint file -
runId is a UUID that is auto-generated every time
StreamExecution
is created
Id¶
StreamExecution
is uniquely identified by an ID of the streaming query (which is the id
of the StreamMetadata).
Since the StreamMetadata is persisted (to the metadata
file in the checkpoint directory), the streaming query ID "survives" query restarts as long as the checkpoint directory is preserved.
Run Id¶
StreamExecution
is uniquely identified by a run ID of the streaming query. A run ID is a randomly-generated 128-bit universally unique identifier (UUID) that is assigned at the time StreamExecution
is created.
runId
does not "survive" query restarts and will always be different yet unique (across all active queries).
StreamMetadata¶
streamMetadata: StreamMetadata
StreamExecution
creates a StreamMetadata when created.
StreamExecution
uses metadata
as the filename of the metadata file that is stored (in JSON format) in the checkpoint directory.
The metadata
file is used to restore (recover) the id (e.g., when a streaming query is resumed after a failure or a planned stop).
Metadata Logs¶
Write-Ahead Offset Log¶
offsetLog: OffsetSeqLog
StreamExecution
creates an OffsetSeqLog when created.
offsetLog
stores offsets in offsets
subdirectory of the metadata directory.
offsetLog
is used as a Write-Ahead Log of Offsets to persist offsets of the data about to be processed.
The number of entries in the OffsetSeqLog
is controlled using spark.sql.streaming.minBatchesToRetain configuration property.
offsetLog
is used when:
-
ContinuousExecution
stream execution engine is requested to commit an epoch, getStartOffsets, and addOffset -
MicroBatchExecution
stream execution engine is requested to populate start offsets and construct (or skip) the next streaming micro-batch
Offset Commit Log¶
StreamExecution
uses offset commit log (CommitLog with commits
metadata checkpoint directory) for streaming batches successfully executed (with a single file per batch with a file name being the batch id) or committed epochs.
Note
Metadata log or metadata checkpoint are synonyms and are often used interchangeably.
commitLog
is used by the <
-
MicroBatchExecution
is requested to <> (that in turn requests to < > at the very beginning of the streaming query execution and later regularly every < >) -
ContinuousExecution
is requested to <> (that in turn requests to < > at the very beginning of the streaming query execution and later regularly every < >)
State of Streaming Query¶
state: AtomicReference[State]
state
indicates the internal state of execution of the streaming query (as java.util.concurrent.atomic.AtomicReference).
ACTIVE¶
StreamExecution
has been requested to <
INITIALIZING¶
StreamExecution
has been created.
TERMINATED¶
Indicates that:
MicroBatchExecution
has been requested to stopContinuousExecution
has been requested to stopStreamExecution
has been requested to run stream processing (and has finished running the activated streaming query)
RECONFIGURING¶
Used when ContinuousExecution
is requested to run a streaming query in continuous mode (and the ContinuousReader indicated a need for reconfiguration)
Creating StreamingWrite¶
createStreamingWrite(
table: SupportsWrite,
options: Map[String, String],
inputPlan: LogicalPlan): StreamingWrite
createStreamingWrite
creates a LogicalWriteInfoImpl
(with the query ID, the schema of the input LogicalPlan
and the given options).
createStreamingWrite
requests the given SupportsWrite
(Spark SQL) table for a WriteBuilder
(for the LogicalWriteInfoImpl
).
createStreamingWrite
branches based on the OutputMode:
-
For Append output mode,
createStreamingWrite
requests theWriteBuilder
to build aStreamingWrite
. -
For Complete output mode,
createStreamingWrite
assumes that theWriteBuilder
is aSupportsTruncate
(Spark SQL) and requests it totruncate
followed bybuildForStreaming
-
For Update output mode,
createStreamingWrite
assumes that theWriteBuilder
is aSupportsStreamingUpdate
and requests it toupdate
followed bybuildForStreaming
createStreamingWrite
is used when:
- MicroBatchExecution and ContinuousExecution stream execution engines are requested for analyzed logical plans
Offsets by SparkDataStream¶
Available¶
availableOffsets: StreamProgress
availableOffsets
is a registry of offsets per streaming source to track what data (by offset) is available for processing for every streaming source in the streaming query (and have not yet been committed).
availableOffsets
works in tandem with the committedOffsets internal registry.
availableOffsets
is empty when StreamExecution
is created (i.e. no offsets are reported for any streaming source in the streaming query).
Committed (Processed)¶
committedOffsets: StreamProgress
committedOffsets
is a registry of offsets by streaming source to track what data (by offset) has already been processed and committed (to the sink or state stores) for every streaming source in the streaming query.
committedOffsets
works in tandem with the availableOffsets internal registry.
Latest¶
latestOffsets: StreamProgress
StreamExecution
creates a StreamProgress to track the latest available offsets (by SparkDataStream) when created.
latestOffsets
is updated when:
MicroBatchExecution
is requested to constructNextBatch (after getting the latest available offsets from unique sources)
latestOffsets
is used when:
MicroBatchExecution
is requested to runActivatedStream (and recordTriggerOffsets)ContinuousExecution
is requested to commit an epoch (and recordTriggerOffsets)
Fully-Qualified (Resolved) Path to Checkpoint Root Directory¶
resolvedCheckpointRoot: String
resolvedCheckpointRoot
is a fully-qualified path of the given checkpoint root directory.
The given checkpoint root directory is defined using checkpointLocation option or the spark.sql.streaming.checkpointLocation configuration property with queryName
option.
checkpointLocation
and queryName
options are defined when StreamingQueryManager
is requested to create a streaming query.
resolvedCheckpointRoot
is used when creating the path to the checkpoint directory and when StreamExecution
finishes running streaming batches.
resolvedCheckpointRoot
is used for the logicalPlan (while transforming analyzedPlan and planning StreamingRelation
logical operators to corresponding StreamingExecutionRelation
physical operators with the streaming data sources created passing in the path to sources
directory to store checkpointing metadata).
resolvedCheckpointRoot
is printed out immediately when resolved as a INFO message to the logs:
Checkpoint root [checkpointRoot] resolved to [resolvedCheckpointRoot].
resolvedCheckpointRoot
is printed out again as a INFO message to the logs when StreamExecution
is started:
Starting [prettyIdString]. Use [resolvedCheckpointRoot] to store the query checkpoint.
StreamWriterCommitProgress¶
sinkCommitProgress: Option[StreamWriterCommitProgress]
sinkCommitProgress
is part of the ProgressReporter abstraction.
StreamExecution
initializes sinkCommitProgress
registry to be None
when created.
Last Incremental QueryExecution¶
lastExecution: IncrementalExecution
lastExecution
is part of the ProgressReporter abstraction.
lastExecution
is an IncrementalExecution (a QueryExecution
of a streaming query) of the most recent (last) execution.
lastExecution
is created (and requested for the "executable" SparkPlan
) when the stream execution engines are requested for the following:
MicroBatchExecution
is requested to run a single streaming micro-batch (when in queryPlanning Phase)ContinuousExecution
is requested to run a streaming query (when in queryPlanning Phase)
lastExecution
is the QueryExecution
of the DataFrame
with new data for every microbatch in MicroBatchExecution.
lastExecution
is used when:
MicroBatchExecution
is requested to construct or skip the next streaming micro-batch (if there are StateStoreWriters in a streaming query), run a single streaming micro-batch (when in addBatch Phase and updating watermark and committing offsets to offset commit log)StreamExecution
is requested to explain a streaming query (using explainInternal)ContinuousExecution
is requested to run a streaming query (when in runContinuous Phase)- For debugging query execution of a streaming query (
codegenString
,codegenStringSeq
)
Explaining Streaming Query¶
explain(): Unit // (1)!
explain(extended: Boolean): Unit
- Turns the
extended
flag off (false
)
explain
simply prints out explainInternal to the standard output.
Stopping Streaming Sources and Readers¶
stopSources(): Unit
stopSources
requests every streaming source to stop.
In case of an non-fatal exception, stopSources
prints out the following WARN message to the logs:
Failed to stop streaming source: [source]. Resources may have leaked.
stopSources
is used when:
-
StreamExecution
is requested to <> (and < > successfully or not) -
ContinuousExecution
is requested to <> (and terminates)
Running Stream Processing¶
runStream(): Unit
runStream
simply prepares the environment to execute the activated streaming query.
runStream
is used when the stream execution thread is requested to start (when DataStreamWriter
is requested to start an execution of the streaming query).
Internally, runStream
sets the job group (to all the Spark jobs started by this thread) as follows:
-
runId for the job group ID
-
getBatchDescriptionString for the job group description (to display in web UI)
-
interruptOnCancel
flag on
Note
runStream
uses the SparkSession to access SparkContext
and assign the job group id.
Learn more about SparkContext.setJobGroup method in The Internals of Apache Spark online book.
runStream
sets sql.streaming.queryId
local property to id.
runStream
requests the MetricsSystem
to register the MetricsReporter when spark.sql.streaming.metricsEnabled configuration property is enabled.
runStream
notifies StreamingQueryListeners that the streaming query has been started (by posting a new QueryStartedEvent event with id, runId, and name).
runStream
unblocks the main starting thread (by decrementing the count of the startLatch that when 0
lets the starting thread continue).
runStream
updates the status message to be Initializing sources.
runStream
initializes the analyzed logical plan.
Lazy Value
The analyzed logical plan is a Scala lazy value to guarantee that the code to initialize it is executed once only (when accessed for the first time) and cached afterwards.
runStream
disables adaptive query execution and cost-based join optimization (by turning spark.sql.adaptive.enabled
and spark.sql.cbo.enabled
configuration properties off, respectively).
runStream
creates a new "zero" OffsetSeqMetadata.
(when in INITIALIZING state) runStream
enters ACTIVE state:
-
Decrements the count of initializationLatch
Note
runBatches
does the main work only when first started (when in INITIALIZING state).
runStream
...FIXME (describe the failed and stop states)
Once TriggerExecutor has finished executing batches, runBatches
updates the status message to Stopped.
NOTE: TriggerExecutor finishes executing batches when the batch runner returns whether the streaming query is stopped or not (while active).
finally Block¶
runStream
releases the startLatch and initializationLatch latches.
runStream
stopSources.
runStream
enters TERMINATED state.
runStream
sets the StreamingQueryStatus with the isTriggerActive
and isDataAvailable
flags off (false
).
runStream
removes the stream metrics reporter from the application's MetricsSystem
.
runStream
requests the StreamingQueryManager
to handle termination of a streaming query.
runStream
creates a new QueryTerminatedEvent (with the id and run id of the streaming query) and posts it.
With the deleteCheckpointOnStop flag enabled and no StreamingQueryException
, runStream
deletes the checkpoint directory.
In the end, runStream
releases the terminationLatch latch.
TriggerExecutor's Batch Runner¶
Batch Runner (batchRunner
) is an executable block executed by TriggerExecutor in runBatches.
batchRunner
starts trigger calculation.
As long as the query is not stopped (i.e. state is not TERMINATED), batchRunner
executes the streaming batch for the trigger.
In triggerExecution time-tracking section, runBatches
branches off per currentBatchId:
-
For
currentBatchId < 0
:- populateStartOffsets
- Setting Job Description as getBatchDescriptionString
Stream running from [committedOffsets] to [availableOffsets]
-
For
currentBatchId >= 0
:
If there is data available in the sources, batchRunner
marks currentStatus with isDataAvailable
enabled.
Tip
You can check out the status of a streaming query using status method.
scala> spark.streams.active(0).status
res1: org.apache.spark.sql.streaming.StreamingQueryStatus =
{
"message" : "Waiting for next trigger",
"isDataAvailable" : false,
"isTriggerActive" : false
}
batchRunner
then updates the status message to Processing new data and runs the current streaming batch.
After triggerExecution section has finished, batchRunner
finishes the streaming batch for the trigger (and collects query execution statistics).
When there was <batchRunner
updates committed offsets (by adding the <
batchRunner
prints out the following DEBUG message to the logs:
batch [currentBatchId] committed
batchRunner
increments the current batch id and sets the job description for all the following Spark jobs to include the new batch id.
When no data was available in the sources to process, batchRunner
does the following:
-
Marks currentStatus with
isDataAvailable
disabled -
Updates the status message to Waiting for data to arrive
-
Sleeps the current thread for pollingDelayMs milliseconds.
batchRunner
updates the status message to Waiting for next trigger and returns whether the query is currently active or not (so TriggerExecutor can decide whether to finish executing the batches or not)
Path to Checkpoint Directory¶
checkpointFile(
name: String): String
checkpointFile
gives the path of a directory with name
in checkpoint directory.
checkpointFile
is used for streamMetadata, OffsetSeqLog, BatchCommitLog, and lastExecution (for runBatch).
Posting StreamingQueryListener Event¶
postEvent(
event: StreamingQueryListener.Event): Unit
postEvent
is a part of the ProgressReporter abstraction.
postEvent
simply requests the StreamingQueryManager
to post the input event (to the StreamingQueryListenerBus in the current SparkSession
).
Note
postEvent
uses SparkSession
to access the current StreamingQueryManager
.
postEvent
is used when:
-
ProgressReporter
is requested to report update progress (while finishing a trigger) -
StreamExecution
runs streaming batches (and announces starting a streaming query by posting a QueryStartedEvent and query termination by posting a QueryTerminatedEvent)
Waiting Until No New Data Available in Sources or Query Has Been Terminated¶
processAllAvailable(): Unit
processAllAvailable
is a part of the StreamingQuery abstraction.
processAllAvailable
reports the <
NOTE: <StreamExecution
is requested to <
processAllAvailable
returns immediately when StreamExecution
is no longer <TERMINATED
state).
processAllAvailable
acquires a lock on the <false
).
processAllAvailable
keeps polling with 10-second pauses (locked on <true
) or StreamExecution
is no longer <TERMINATED
state).
NOTE: The 10-second pause is hardcoded and cannot be changed.
In the end, processAllAvailable
releases <
processAllAvailable
throws an IllegalStateException
when executed on the <
Cannot wait for a query state from the same thread that is running the query
Stream Execution Thread¶
queryExecutionThread: QueryExecutionThread
queryExecutionThread
is a Java thread of execution (java.util.Thread) that runs a streaming query.
queryExecutionThread
is started (as a daemon thread) when StreamExecution
is requested to <start
prints out the following INFO message to the logs (with the <
Starting [prettyIdString]. Use [resolvedCheckpointRoot] to store the query checkpoint.
When started, queryExecutionThread
sets the <
queryExecutionThread
uses the name stream execution thread for [id] (that uses <queryName [id = [id], runId = [runId]]
).
queryExecutionThread
is a QueryExecutionThread
that is a custom UninterruptibleThread
from Apache Spark with runUninterruptibly
method for running a block of code without being interrupted by Thread.interrupt()
.
Tip
Use Java's jconsole or jstack to monitor stream execution threads.
$ jstack <driver-pid> | grep -e "stream execution thread"
"stream execution thread for kafka-topic1 [id =...
Batch Metadata¶
offsetSeqMetadata: OffsetSeqMetadata
offsetSeqMetadata
is part of the ProgressReporter abstraction.
StreamExecution
creates an OffsetSeqMetadata when created.
Most importantly, the OffsetSeqMetadata
is used to create an IncrementalExecution in the queryPlanning phase of the MicroBatchExecution and ContinuousExecution execution engines.
offsetSeqMetadata
is initialized (with 0
for batchWatermarkMs
and batchTimestampMs
) when StreamExecution
is requested to <
offsetSeqMetadata
is then updated (with the current event-time watermark and timestamp) when MicroBatchExecution
is requested to <
NOTE: MicroBatchExecution
uses the <
offsetSeqMetadata
is stored (checkpointed) in <MicroBatchExecution
(and printed out as INFO message to the logs).
offsetSeqMetadata
is restored (re-created) from a checkpointed state when MicroBatchExecution
is requested to <
offsetSeqMetadata
is part of the ProgressReporter abstraction.
isActive¶
isActive: Boolean
isActive
is part of the StreamingQuery abstraction.
isActive
is enabled (true
) as long as the State is not TERMINATED.
Human-Readable HTML Description of Spark Jobs (for web UI)¶
getBatchDescriptionString: String
getBatchDescriptionString
is a human-readable description (in HTML format) that uses the optional name if defined, the <batchDescription
that can be init (for the <
getBatchDescriptionString
is of the following format:
[name]
id = [id]
runId = [runId]
batch = [batchDescription]
getBatchDescriptionString
is used when:
-
MicroBatchExecution
stream execution engine is requested to <> (as the job description of any Spark jobs triggerred as part of query execution) -
StreamExecution
is requested to <> (as the job group description of any Spark jobs triggerred as part of query execution)
No New Data Available¶
noNewData: Boolean
noNewData
is a flag that indicates that a batch has completed with no new data left and processAllAvailable could stop waiting till all streaming data is processed.
Default: false
Turned on (true
) when:
-
MicroBatchExecution
stream execution engine is requested to <> (while < >) -
ContinuousExecution
stream execution engine is requested to <>
Turned off (false
) when:
-
MicroBatchExecution
stream execution engine is requested to <> (right after the < > phase) -
StreamExecution
is requested to <>
Current Batch ID¶
-
-1
whenStreamExecution
is created -
0
whenStreamExecution
populates start offsets (and OffsetSeqLog is empty, i.e. no offset files inoffsets
directory in checkpoint) -
Incremented when
StreamExecution
runs streaming batches and finishes a trigger that had data available from sources (right after committing the batch).
newData Registry¶
newData: Map[BaseStreamingSource, LogicalPlan]
Registry of the streaming sources (in the logical query plan) that have new data available in the current batch. The new data is a streaming DataFrame
.
newData
is part of the ProgressReporter abstraction.
Set when StreamExecution
is requested to requests unprocessed data from streaming sources (while running a single streaming batch)
MetricsReporter¶
StreamExecution
defines a MetricsReporter to report streaming metrics.
MetricsReporter
is created with the following source name (with name if defined or id):
spark.streaming.[name or id]
MetricsReporter
is registered only when spark.sql.streaming.metricsEnabled configuration property is enabled (when StreamExecution
is requested to run).
MetricsReporter
is deactivated (removed) when a streaming query is stopped (when StreamExecution
is requested to runStream).
Latches¶
StreamExecution
uses java.util.concurrent.CountDownLatches (with count 1
).
initializationLatch¶
Counted down when requested to runStream:
- Changes state from INITIALIZING to ACTIVE just before runActivatedStream
- In runStream's finally block
Awaited for tests only (which seems to indicate that it is a test-only latch)
startLatch¶
Counted down when requested to runStream:
Awaited when requested to start (to pause the main thread until StreamExecution
was requested to run the streaming query on a separate thread)
terminationLatch¶
Counted down at the end of runStream
Awaited when requested to awaitTermination (that pauses the thread until the streaming query has finished successfully or not).
Locks¶
awaitProgressLock¶
StreamExecution
uses a fair reentrant mutual exclusion java.util.concurrent.locks.ReentrantLock (that favors granting access to the longest-waiting thread under contention)
__is_continuous_processing Local Property¶
StreamExecution
uses __is_continuous_processing local property (default: false
) to differentiate between <true
) and <false
) which is used when StateStoreRDD
is requested to compute a partition (and finds a StateStore for a given version).
Demo¶
import org.apache.spark.sql.streaming.StreamingQuery
assert(sq.isInstanceOf[StreamingQuery])
import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper
val se = sq.asInstanceOf[StreamingQueryWrapper].streamingQuery
scala> :type se
org.apache.spark.sql.execution.streaming.StreamExecution
Logging¶
Enable ALL
logging level for org.apache.spark.sql.execution.streaming.StreamExecution
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.sql.execution.streaming.StreamExecution=ALL
Refer to Logging.