StorageListener

StorageListener is a BlockStatusListener that uses SparkListener callbacks to track changes in the persistence status of RDD blocks in a Spark application.

Table 1. StorageListener’s SparkListener Callbacks (in alphabetical order)
Callback Description

onBlockUpdated

Updates _rddInfoMap with the update to a single block.

onStageCompleted

Removes RDDInfo instances from _rddInfoMap that participated in the completed stage as well as the ones that are no longer cached.

onStageSubmitted

Updates _rddInfoMap registry with the names of every RDDInfo in the submitted stage, possibly adding new RDDInfo instances if they were not registered yet.

onUnpersistRDD

Removes an RDDInfo from _rddInfoMap registry for the unpersisted RDD.

Table 2. StorageListener’s Internal Registries and Counters
Name Description

_rddInfoMap

RDDInfo instances per IDs

Used when…​FIXME

Creating StorageListener Instance

StorageListener takes the following when created:

StorageListener initializes the internal registries and counters.

StorageListener is created when SparkUI is created.

Finding Active BlockManagers — activeStorageStatusList Method

activeStorageStatusList: Seq[StorageStatus]

activeStorageStatusList requests StorageStatusListener for active BlockManagers (on executors).

activeStorageStatusList is used when:

Intercepting Block Status Update Events — onBlockUpdated Callback

onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit

onBlockUpdated creates a BlockStatus (from the input SparkListenerBlockUpdated) and updates registered RDDInfos (with block updates from BlockManagers) (passing in BlockId and BlockStatus as a single-element collection of updated blocks).

onBlockUpdated is part of SparkListener contract to announce that there was a change in a block status (on a BlockManager on an executor).

Intercepting Stage Completed Events — onStageCompleted Callback

onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit

onStageCompleted finds the identifiers of the RDDs that have participated in the completed stage and removes them from _rddInfoMap registry as well as the RDDs that are no longer cached.

onStageCompleted is part of SparkListener contract to announce that a stage has finished.

Intercepting Stage Submitted Events — onStageSubmitted Callback

onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit

onStageSubmitted updates _rddInfoMap registry with the names of every RDDInfo in stageSubmitted, possibly adding new RDDInfo instances if they were not registered yet.

onStageSubmitted is part of SparkListener contract to announce that the missing tasks of a stage were submitted for execution.

Intercepting Unpersist RDD Events — onUnpersistRDD Callback

onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD): Unit

onUnpersistRDD removes the RDDInfo from _rddInfoMap registry for the unpersisted RDD (from unpersistRDD).

onUnpersistRDD is part of SparkListener contract to announce that an RDD has been unpersisted.

Updating Registered RDDInfos (with Block Updates from BlockManagers)

updateRDDInfo(updatedBlocks: Seq[(BlockId, BlockStatus)]): Unit

updateRDDInfo finds the RDDs for the input updatedBlocks (for BlockIds).

updateRDDInfo finds BlockIds that are RDDBlockIds.

updateRDDInfo takes RDDInfo entries (in _rddInfoMap registry) for which there are blocks in the input updatedBlocks and updates RDDInfos (using StorageStatus) (from activeStorageStatusList).

updateRDDInfo is used exclusively when StorageListener gets notified about a change in a block status (on a BlockManager on an executor).

Updating RDDInfos (using StorageStatus) — StorageUtils.updateRddInfo Method

updateRddInfo(rddInfos: Seq[RDDInfo], statuses: Seq[StorageStatus]): Unit
FIXME