Skip to content

StreamingQueryManager

StreamingQueryManager is the management interface for active streaming queries of a SparkSession.

StreamingQueryManager is used (internally) to create a StreamingQuery (and its StreamExecution).

StreamingQueryManager Creates StreamingQuery (and StreamExecution)

Creating Instance

StreamingQueryManager takes the following to be created:

  • SparkSession

StreamingQueryManager is created when SessionState is requested for one.

Tip

Learn more about SessionState in The Internals of Spark SQL online book.

StreamingQueryManager

All Active Streaming Queries

active: Array[StreamingQuery]

Active streaming queries

Registering StreamingQueryListener

addListener(
  listener: StreamingQueryListener): Unit

Registers (adds) a StreamingQueryListener

Awaiting Any Termination

awaitAnyTermination(): Unit
awaitAnyTermination(
  timeoutMs: Long): Boolean

Waits until any streaming query terminates or timeoutMs elapses

Getting Active StreamingQuery by ID

get(
  id: String): StreamingQuery
get(
  id: UUID): StreamingQuery

Gets the StreamingQuery by id

Deregistering StreamingQueryListener

removeListener(
  listener: StreamingQueryListener): Unit

De-registers (removes) the StreamingQueryListener

Resetting Terminated Queries

resetTerminated(): Unit

Resets the internal registry of the terminated streaming queries (that lets awaitAnyTermination to be used again)

Accessing StreamingQueryManager

StreamingQueryManager is available using SparkSession.streams property.

scala> :type spark
org.apache.spark.sql.SparkSession

scala> :type spark.streams
org.apache.spark.sql.streaming.StreamingQueryManager

StreamingQueryListenerBus

listenerBus: StreamingQueryListenerBus

listenerBus is a StreamingQueryListenerBus (for the current SparkSession) that is created immediately when StreamingQueryManager is created.

listenerBus is used for the following:

Registering StreamingQueryListener

addListener(
  listener: StreamingQueryListener): Unit

addListener requests the StreamingQueryListenerBus to add the input StreamingQueryListener.

De-Registering StreamingQueryListener

removeListener(
  listener: StreamingQueryListener): Unit

removeListener requests StreamingQueryListenerBus to remove the input StreamingQueryListener.

Creating Streaming Query

createQuery(
  userSpecifiedName: Option[String],
  userSpecifiedCheckpointLocation: Option[String],
  df: DataFrame,
  extraOptions: Map[String, String],
  sink: BaseStreamingSink,
  outputMode: OutputMode,
  useTempCheckpointLocation: Boolean,
  recoverFromCheckpointLocation: Boolean,
  trigger: Trigger,
  triggerClock: Clock): StreamingQueryWrapper

createQuery creates a StreamingQueryWrapper (for a StreamExecution per the input user-defined properties).

Internally, createQuery first finds the name of the checkpoint directory of a query (aka checkpoint location) in the following order:

  1. Exactly the input userSpecifiedCheckpointLocation if defined

  2. spark.sql.streaming.checkpointLocation Spark property if defined for the parent directory with a subdirectory per the optional userSpecifiedName (or a randomly-generated UUID)

  3. (only when useTempCheckpointLocation is enabled) A temporary directory (as specified by java.io.tmpdir JVM property) with a subdirectory with temporary prefix.

Note

userSpecifiedCheckpointLocation can be any path that is acceptable by Hadoop's Path.

If the directory name for the checkpoint location could not be found, createQuery reports a AnalysisException.

checkpointLocation must be specified either through option("checkpointLocation", ...) or SparkSession.conf.set("spark.sql.streaming.checkpointLocation", ...)

createQuery reports a AnalysisException when the input recoverFromCheckpointLocation flag is turned off but there is offsets directory in the checkpoint location.

createQuery makes sure that the logical plan of the structured query is analyzed (i.e. no logical errors have been found).

Unless spark.sql.streaming.unsupportedOperationCheck configuration property is enabled, createQuery checks the logical plan of the streaming query for unsupported operations.

(only when spark.sql.adaptive.enabled Spark property is turned on) createQuery prints out a WARN message to the logs:

spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.

In the end, createQuery creates a StreamingQueryWrapper with a new MicroBatchExecution.

createQuery is used when StreamingQueryManager is requested to start a streaming query (when DataStreamWriter is requested to start an execution of a streaming query).

recoverFromCheckpointLocation

recoverFromCheckpointLocation flag corresponds to recoverFromCheckpointLocation flag that StreamingQueryManager uses to start a streaming query and which is enabled by default (and is in fact the only place where createQuery is used).

  • memory sink has the flag enabled for Complete output mode only

  • foreach sink has the flag always enabled

  • console sink has the flag always disabled

  • all other sinks have the flag always enabled

userSpecifiedName

userSpecifiedName corresponds to queryName option (that can be defined using DataStreamWriter's queryName method) while userSpecifiedCheckpointLocation is checkpointLocation option.

Starting Streaming Query Execution

startQuery(
  userSpecifiedName: Option[String],
  userSpecifiedCheckpointLocation: Option[String],
  df: DataFrame,
  extraOptions: Map[String, String],
  sink: BaseStreamingSink,
  outputMode: OutputMode,
  useTempCheckpointLocation: Boolean = false,
  recoverFromCheckpointLocation: Boolean = true,
  trigger: Trigger = ProcessingTime(0),
  triggerClock: Clock = new SystemClock()): StreamingQuery

startQuery starts a streaming query and returns a handle to it.

Internally, startQuery first creates a StreamingQueryWrapper, registers it in activeQueries internal registry (by the id), requests it for the underlying StreamExecution and starts it.

In the end, startQuery returns the StreamingQueryWrapper (as part of the fluent API so you can chain operators) or throws the exception that was reported when attempting to start the query.

startQuery throws an IllegalArgumentException when there is another query registered under name. startQuery looks it up in the activeQueries internal registry.

Cannot start query with name [name] as a query with that name is already active

startQuery throws an IllegalStateException when a query is started again from checkpoint. startQuery looks it up in activeQueries internal registry.

Cannot start query with id [id] as another query with same id is already active. Perhaps you are attempting to restart a query from checkpoint that is already active.

startQuery is used when DataStreamWriter is requested to start an execution of the streaming query.

Posting StreamingQueryListener Event to StreamingQueryListenerBus

postListenerEvent(
  event: StreamingQueryListener.Event): Unit

postListenerEvent simply posts the input event to the internal event bus for streaming events (StreamingQueryListenerBus).

StreamingQueryManager Propagates StreamingQueryListener Events

postListenerEvent is used when StreamExecution is requested to post a streaming event.

Handling Termination of Streaming Query (and Deactivating Query in StateStoreCoordinator)

notifyQueryTermination(
  terminatedQuery: StreamingQuery): Unit

notifyQueryTermination removes the terminatedQuery from activeQueries internal registry (by the query id).

notifyQueryTermination records the terminatedQuery in lastTerminatedQuery internal registry (when no earlier streaming query was recorded or the terminatedQuery terminated due to an exception).

notifyQueryTermination notifies others that are blocked on awaitTerminationLock.

In the end, notifyQueryTermination requests StateStoreCoordinator to deactivate all active runs of the streaming query.

StreamingQueryManager's Marking Streaming Query as Terminated

notifyQueryTermination is used when StreamExecution is requested to run a streaming query and the query has finished (running streaming batches) (with or without an exception).

Active Streaming Queries by ID

Registry of StreamingQuerys per UUID

Used when StreamingQueryManager is requested for active streaming queries, get a streaming query by id, starts a streaming query and is notified that a streaming query has terminated.

Last-Terminated Streaming Query

StreamingQuery that has recently been terminated (i.e. stopped or due to an exception).

null when no streaming query has terminated yet or resetTerminated.

StateStoreCoordinatorRef

StateStoreCoordinatorRef to the StateStoreCoordinator RPC Endpoint

Used when: