Skip to content

LiveListenerBus

LiveListenerBus is an event bus to dispatch Spark events to registered SparkListeners.

LiveListenerBus, SparkListenerEvents, and Senders

LiveListenerBus is a single-JVM SparkListenerBus that uses listenerThread to poll events.

Note

The event queue is java.util.concurrent.LinkedBlockingQueue with capacity of 10000 SparkListenerEvent events.

Creating Instance

LiveListenerBus takes the following to be created:

LiveListenerBus is created (and started) when SparkContext is requested to initialize.

Event Queues

queues: CopyOnWriteArrayList[AsyncEventQueue]

LiveListenerBus manages AsyncEventQueues.

queues is initialized empty when LiveListenerBus is created.

queues is used when:

LiveListenerBusMetrics

metrics: LiveListenerBusMetrics

LiveListenerBus creates a LiveListenerBusMetrics when created.

metrics is registered (with a MetricsSystem) when LiveListenerBus is started.

metrics is used to:

Starting LiveListenerBus

start(
  sc: SparkContext,
  metricsSystem: MetricsSystem): Unit

start starts AsyncEventQueues (from the queues internal registry).

In the end, start requests the given MetricsSystem to register the LiveListenerBusMetrics.

start is used when:

Posting Event to All Queues

post(
  event: SparkListenerEvent): Unit

post puts the input event onto the internal eventQueue queue and releases the internal eventLock semaphore. If the event placement was not successful (and it could happen since it is tapped at 10000 events) onDropEvent method is called.

The event publishing is only possible when stopped flag has been enabled.

post is used when...FIXME

postToQueues

postToQueues(
  event: SparkListenerEvent): Unit

postToQueues...FIXME

Event Dropped Callback

onDropEvent(
  event: SparkListenerEvent): Unit

onDropEvent is called when no further events can be added to the internal eventQueue queue (while posting a SparkListenerEvent event).

It simply prints out the following ERROR message to the logs and ensures that it happens only once.

Dropping SparkListenerEvent because no remaining room in event queue. This likely means one of the SparkListeners is too slow and cannot keep up with the rate at which tasks are being started by the scheduler.

Stopping LiveListenerBus

stop(): Unit

stop releases the internal eventLock semaphore and waits until listenerThread dies. It can only happen after all events were posted (and polling eventQueue gives nothing).

stopped flag is enabled.

listenerThread for Event Polling

LiveListenerBus uses a SparkListenerBus single-daemon thread that ensures that the polling events from the event queue is only after the listener was started and only one event at a time.

Registering Listener with Status Queue

addToStatusQueue(
  listener: SparkListenerInterface): Unit

addToStatusQueue adds the given SparkListenerInterface to appStatus queue.

addToStatusQueue is used when:

  • BarrierCoordinator is requested to onStart
  • SparkContext is created
  • HiveThriftServer2 utility is used to createListenerAndUI
  • SharedState (Spark SQL) is requested to create a SQLAppStatusStore

Registering Listener with Shared Queue

addToSharedQueue(
  listener: SparkListenerInterface): Unit

addToSharedQueue adds the given SparkListenerInterface to shared queue.

addToSharedQueue is used when:

Registering Listener with executorManagement Queue

addToManagementQueue(
  listener: SparkListenerInterface): Unit

addToManagementQueue adds the given SparkListenerInterface to executorManagement queue.

addToManagementQueue is used when:

  • ExecutorAllocationManager is requested to start
  • HeartbeatReceiver is created

Registering Listener with eventLog Queue

addToEventLogQueue(
  listener: SparkListenerInterface): Unit

addToEventLogQueue adds the given SparkListenerInterface to eventLog queue.

addToEventLogQueue is used when:

  • SparkContext is created (with event logging enabled)

Registering Listener with Queue

addToQueue(
  listener: SparkListenerInterface,
  queue: String): Unit

addToQueue finds the queue in the queues internal registry.

If found, addToQueue requests it to add the given listener

If not found, addToQueue creates a AsyncEventQueue (with the given name, the LiveListenerBusMetrics, and this LiveListenerBus) and requests it to add the given listener. The AsyncEventQueue is started and added to the queues internal registry.

addToQueue is used when:

Deregistering Listener

removeListener(
  listener: SparkListenerInterface): Unit

removeListener...FIXME

removeListener is used when:


Last update: 2020-11-27