Skip to content

StreamingQueryListener — Intercepting Life Cycle Events of Streaming Queries

StreamingQueryListener is an abstraction of listeners to be notified about the life cycle events of all the streaming queries in a Spark Structured Streaming application:

StreamingQueryListener is used internally by StreamingQueryListenerBus to post a streaming event to all registered StreamingQueryListeners.

StreamingQueryListener can be used by Spark developers to intercept events in Spark Structured Streaming applications.

Contract

onQueryProgress

onQueryProgress(
  event: QueryProgressEvent): Unit

Informs that MicroBatchExecution has finished triggerExecution phase (the end of a streaming batch)

StreamingQueryListener Notified about Query's Progress (onQueryProgress)

onQueryStarted

onQueryStarted(
  event: QueryStartedEvent): Unit

Informs that DataStreamWriter was requested to start execution of the streaming query (on the stream execution thread)

StreamingQueryListener Notified about Query's Start (onQueryStarted)

Note

onQueryStarted is used internally to unblock the starting thread of StreamExecution.

onQueryTerminated

onQueryTerminated(
  event: QueryTerminatedEvent): Unit

Informs that a streaming query was <> or terminated due to an error

StreamingQueryListener Notified about Query's Termination (onQueryTerminated)

Lifecycle Events

StreamingQueryListener is informed about the life cycle events when StreamingQueryListenerBus is requested to doPostEvent.

QueryStartedEvent

Intercepted by onQueryStarted

Posted when StreamExecution is requested to run stream processing (when DataStreamWriter is requested to start execution of the streaming query on the stream execution thread)

QueryProgressEvent

Intercepted by onQueryProgress

Posted when ProgressReporter is requested to update progress of a streaming query (after MicroBatchExecution has finished triggerExecution phase at the end of a streaming batch)

QueryTerminatedEvent

Intercepted by onQueryTerminated

Posted when StreamExecution is requested to run stream processing (and the streaming query was stopped or terminated due to an error)

Registering StreamingQueryListener

StreamingQueryListener can be registered using StreamingQueryManager.addListener method.

val queryListener: StreamingQueryListener = ...
spark.streams.addListener(queryListener)

Deregistering StreamingQueryListener

StreamingQueryListener can be deregistered using StreamingQueryManager.removeListener method.

val queryListener: StreamingQueryListener = ...
spark.streams.removeListener(queryListener)