StreamingQueryManager¶
StreamingQueryManager
is the management interface for active streaming queries of a SparkSession.
StreamingQueryManager
is used (internally) to create a StreamingQuery (and its StreamExecution).
Creating Instance¶
StreamingQueryManager
takes the following to be created:
-
SparkSession
StreamingQueryManager
is created when SessionState
is requested for one.
Tip
Learn more about SessionState in The Internals of Spark SQL online book.
All Active Streaming Queries¶
active: Array[StreamingQuery]
Registering StreamingQueryListener¶
addListener(
listener: StreamingQueryListener): Unit
Registers (adds) a StreamingQueryListener
Awaiting Any Termination¶
awaitAnyTermination(): Unit
awaitAnyTermination(
timeoutMs: Long): Boolean
Waits until any streaming query terminates or timeoutMs
elapses
Getting Active StreamingQuery by ID¶
get(
id: String): StreamingQuery
get(
id: UUID): StreamingQuery
Gets the StreamingQuery by id
Deregistering StreamingQueryListener¶
removeListener(
listener: StreamingQueryListener): Unit
De-registers (removes) the StreamingQueryListener
Resetting Terminated Queries¶
resetTerminated(): Unit
Resets the internal registry of the terminated streaming queries (that lets awaitAnyTermination to be used again)
Accessing StreamingQueryManager¶
StreamingQueryManager
is available using SparkSession.streams
property.
scala> :type spark
org.apache.spark.sql.SparkSession
scala> :type spark.streams
org.apache.spark.sql.streaming.StreamingQueryManager
StreamingQueryListenerBus¶
listenerBus: StreamingQueryListenerBus
listenerBus
is a StreamingQueryListenerBus (for the current SparkSession) that is created immediately when StreamingQueryManager
is created.
listenerBus
is used for the following:
-
Register or de-register a given StreamingQueryListener
-
Post a streaming event (and notify registered StreamingQueryListeners about the event)
Registering StreamingQueryListener¶
addListener(
listener: StreamingQueryListener): Unit
addListener
requests the StreamingQueryListenerBus to add the input StreamingQueryListener.
De-Registering StreamingQueryListener¶
removeListener(
listener: StreamingQueryListener): Unit
removeListener
requests StreamingQueryListenerBus to remove the input StreamingQueryListener.
Creating Streaming Query¶
createQuery(
userSpecifiedName: Option[String],
userSpecifiedCheckpointLocation: Option[String],
df: DataFrame,
extraOptions: Map[String, String],
sink: BaseStreamingSink,
outputMode: OutputMode,
useTempCheckpointLocation: Boolean,
recoverFromCheckpointLocation: Boolean,
trigger: Trigger,
triggerClock: Clock): StreamingQueryWrapper
createQuery
creates a StreamingQueryWrapper (for a StreamExecution per the input user-defined properties).
Internally, createQuery
first finds the name of the checkpoint directory of a query (aka checkpoint location) in the following order:
-
Exactly the input
userSpecifiedCheckpointLocation
if defined -
spark.sql.streaming.checkpointLocation Spark property if defined for the parent directory with a subdirectory per the optional
userSpecifiedName
(or a randomly-generated UUID) -
(only when
useTempCheckpointLocation
is enabled) A temporary directory (as specified byjava.io.tmpdir
JVM property) with a subdirectory withtemporary
prefix.
Note
userSpecifiedCheckpointLocation
can be any path that is acceptable by Hadoop's Path.
If the directory name for the checkpoint location could not be found, createQuery
reports a AnalysisException
.
checkpointLocation must be specified either through option("checkpointLocation", ...) or SparkSession.conf.set("spark.sql.streaming.checkpointLocation", ...)
createQuery
reports a AnalysisException
when the input recoverFromCheckpointLocation
flag is turned off but there is offsets directory in the checkpoint location.
createQuery
makes sure that the logical plan of the structured query is analyzed (i.e. no logical errors have been found).
Unless spark.sql.streaming.unsupportedOperationCheck configuration property is enabled, createQuery
checks the logical plan of the streaming query for unsupported operations.
(only when spark.sql.adaptive.enabled
Spark property is turned on) createQuery
prints out a WARN message to the logs:
spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
In the end, createQuery
creates a StreamingQueryWrapper with a new MicroBatchExecution.
createQuery
is used when StreamingQueryManager
is requested to start a streaming query (when DataStreamWriter
is requested to start an execution of a streaming query).
recoverFromCheckpointLocation¶
recoverFromCheckpointLocation
flag corresponds to recoverFromCheckpointLocation
flag that StreamingQueryManager
uses to start a streaming query and which is enabled by default (and is in fact the only place where createQuery
is used).
-
memory
sink has the flag enabled for Complete output mode only -
foreach
sink has the flag always enabled -
console
sink has the flag always disabled -
all other sinks have the flag always enabled
userSpecifiedName¶
userSpecifiedName
corresponds to queryName
option (that can be defined using DataStreamWriter
's queryName method) while userSpecifiedCheckpointLocation
is checkpointLocation
option.
Starting Streaming Query Execution¶
startQuery(
userSpecifiedName: Option[String],
userSpecifiedCheckpointLocation: Option[String],
df: DataFrame,
extraOptions: Map[String, String],
sink: BaseStreamingSink,
outputMode: OutputMode,
useTempCheckpointLocation: Boolean = false,
recoverFromCheckpointLocation: Boolean = true,
trigger: Trigger = ProcessingTime(0),
triggerClock: Clock = new SystemClock()): StreamingQuery
startQuery
starts a streaming query and returns a handle to it.
Internally, startQuery
first creates a StreamingQueryWrapper, registers it in activeQueries internal registry (by the id), requests it for the underlying StreamExecution and starts it.
In the end, startQuery
returns the StreamingQueryWrapper (as part of the fluent API so you can chain operators) or throws the exception that was reported when attempting to start the query.
startQuery
throws an IllegalArgumentException
when there is another query registered under name
. startQuery
looks it up in the activeQueries internal registry.
Cannot start query with name [name] as a query with that name is already active
startQuery
throws an IllegalStateException
when a query is started again from checkpoint. startQuery
looks it up in activeQueries internal registry.
Cannot start query with id [id] as another query with same id is already active. Perhaps you are attempting to restart a query from checkpoint that is already active.
startQuery
is used when DataStreamWriter
is requested to start an execution of the streaming query.
Posting StreamingQueryListener Event to StreamingQueryListenerBus¶
postListenerEvent(
event: StreamingQueryListener.Event): Unit
postListenerEvent
simply posts the input event
to the internal event bus for streaming events (StreamingQueryListenerBus).
postListenerEvent
is used when StreamExecution
is requested to post a streaming event.
Handling Termination of Streaming Query (and Deactivating Query in StateStoreCoordinator)¶
notifyQueryTermination(
terminatedQuery: StreamingQuery): Unit
notifyQueryTermination
removes the terminatedQuery
from activeQueries internal registry (by the query id).
notifyQueryTermination
records the terminatedQuery
in lastTerminatedQuery internal registry (when no earlier streaming query was recorded or the terminatedQuery
terminated due to an exception).
notifyQueryTermination
notifies others that are blocked on awaitTerminationLock.
In the end, notifyQueryTermination
requests StateStoreCoordinator to deactivate all active runs of the streaming query.
notifyQueryTermination
is used when StreamExecution
is requested to run a streaming query and the query has finished (running streaming batches) (with or without an exception).
Active Streaming Queries by ID¶
Registry of StreamingQuerys per UUID
Used when StreamingQueryManager
is requested for active streaming queries, get a streaming query by id, starts a streaming query and is notified that a streaming query has terminated.
Last-Terminated Streaming Query¶
StreamingQuery that has recently been terminated (i.e. stopped or due to an exception).
null
when no streaming query has terminated yet or resetTerminated.
-
Used in awaitAnyTermination to know when a streaming query has terminated
-
Set when
StreamingQueryManager
is notified that a streaming query has terminated
StateStoreCoordinatorRef¶
StateStoreCoordinatorRef to the StateStoreCoordinator
RPC Endpoint
Used when:
-
StreamingQueryManager
is notified that a streaming query has terminated -
Stateful operators are executed (FlatMapGroupsWithStateExec, StateStoreRestoreExec, StateStoreSaveExec, StreamingDeduplicateExec and StreamingSymmetricHashJoinExec)
-
Creating StateStoreRDD (with storeUpdateFunction aborting StateStore when a task fails)