Skip to content

ExecutorMonitor

ExecutorMonitor is a SparkListener and a CleanerListener.

Creating Instance

ExecutorMonitor takes the following to be created:

ExecutorMonitor is created when:

  • ExecutorAllocationManager is created

shuffleIds Registry

shuffleIds: Set[Int]

ExecutorMonitor uses a mutable HashSet to track shuffle IDs...FIXME

shuffleIds is initialized only when shuffleTrackingEnabled is enabled.

shuffleIds is used by Tracker internal class for the following:

  • updateTimeout, addShuffle, removeShuffle and updateActiveShuffles

Executors Registry

executors: ConcurrentHashMap[String, Tracker]

ExecutorMonitor uses a Java ConcurrentHashMap to track available executors.

An executor is added when (via ensureExecutorIsTracked):

An executor is removed when onExecutorRemoved.

All executors are removed when reset.

executors is used when:

fetchFromShuffleSvcEnabled Flag

fetchFromShuffleSvcEnabled: Boolean

ExecutorMonitor initializes fetchFromShuffleSvcEnabled internal flag based on the values of spark.shuffle.service.enabled and spark.shuffle.service.fetch.rdd.enabled configuration properties.

fetchFromShuffleSvcEnabled is enabled (true) when the aforementioned configuration properties are.

fetchFromShuffleSvcEnabled is used when:

shuffleTrackingEnabled Flag

shuffleTrackingEnabled: Boolean

ExecutorMonitor initializes shuffleTrackingEnabled internal flag based on the values of spark.shuffle.service.enabled and spark.dynamicAllocation.shuffleTracking.enabled configuration properties.

shuffleTrackingEnabled is enabled (true) when the following holds:

  1. spark.shuffle.service.enabled is disabled
  2. spark.dynamicAllocation.shuffleTracking.enabled is enabled

When enabled, shuffleTrackingEnabled is used to skip execution of the following (making them noops):

When disabled, shuffleTrackingEnabled is used for the following:

spark.dynamicAllocation.cachedExecutorIdleTimeout

ExecutorMonitor reads spark.dynamicAllocation.cachedExecutorIdleTimeout configuration property for Tracker to updateTimeout.

onBlockUpdated

onBlockUpdated(
  event: SparkListenerBlockUpdated): Unit

onBlockUpdated is part of the SparkListenerInterface abstraction.

onBlockUpdated...FIXME

onExecutorAdded

onExecutorAdded(
  event: SparkListenerExecutorAdded): Unit

onExecutorAdded is part of the SparkListenerInterface abstraction.

onExecutorAdded...FIXME

onExecutorRemoved

onExecutorRemoved(
  event: SparkListenerExecutorRemoved): Unit

onExecutorRemoved is part of the SparkListenerInterface abstraction.

onExecutorRemoved...FIXME

onJobEnd

onJobEnd(
  event: SparkListenerJobEnd): Unit

onJobEnd is part of the SparkListenerInterface abstraction.

onJobEnd...FIXME

onJobStart

onJobStart(
  event: SparkListenerJobStart): Unit

onJobStart is part of the SparkListenerInterface abstraction.

Note

onJobStart does nothing and simply returns when the shuffleTrackingEnabled flag is turned off (false).

onJobStart requests the input SparkListenerJobStart for the StageInfos and converts...FIXME

onOtherEvent

onOtherEvent(
  event: SparkListenerEvent): Unit

onOtherEvent is part of the SparkListenerInterface abstraction.

onOtherEvent...FIXME

cleanupShuffle

cleanupShuffle(
  id: Int): Unit

cleanupShuffle...FIXME

cleanupShuffle is used when onOtherEvent

onTaskEnd

onTaskEnd(
  event: SparkListenerTaskEnd): Unit

onTaskEnd is part of the SparkListenerInterface abstraction.

onTaskEnd...FIXME

onTaskStart

onTaskStart(
  event: SparkListenerTaskStart): Unit

onTaskStart is part of the SparkListenerInterface abstraction.

onTaskStart...FIXME

onUnpersistRDD

onUnpersistRDD(
  event: SparkListenerUnpersistRDD): Unit

onUnpersistRDD is part of the SparkListenerInterface abstraction.

onUnpersistRDD...FIXME

reset

reset(): Unit

reset...FIXME

reset is used when:

  • FIXME

shuffleCleaned

shuffleCleaned(
  shuffleId: Int): Unit

shuffleCleaned is part of the CleanerListener abstraction.

shuffleCleaned...FIXME

timedOutExecutors

timedOutExecutors(): Seq[String]
timedOutExecutors(
  when: Long): Seq[String]

timedOutExecutors...FIXME

timedOutExecutors is used when:

  • ExecutorAllocationManager is requested to schedule

executorCount

executorCount: Int

executorCount...FIXME

executorCount is used when:

pendingRemovalCount

pendingRemovalCount: Int

pendingRemovalCount...FIXME

pendingRemovalCount is used when:

executorsKilled

executorsKilled(
  ids: Seq[String]): Unit

executorsKilled...FIXME

executorsKilled is used when:

ensureExecutorIsTracked

ensureExecutorIsTracked(
  id: String,
  resourceProfileId: Int): Tracker

ensureExecutorIsTracked...FIXME

ensureExecutorIsTracked is used when:

getResourceProfileId

getResourceProfileId(
  executorId: String): Int

getResourceProfileId...FIXME

getResourceProfileId is used for testing only.