ExecutorMonitor¶
ExecutorMonitor
is a SparkListener and a CleanerListener.
Creating Instance¶
ExecutorMonitor
takes the following to be created:
ExecutorMonitor
is created when:
ExecutorAllocationManager
is created
shuffleIds Registry¶
shuffleIds: Set[Int]
ExecutorMonitor
uses a mutable HashSet
to track shuffle IDs...FIXME
shuffleIds
is initialized only when shuffleTrackingEnabled is enabled.
shuffleIds
is used by Tracker
internal class for the following:
updateTimeout
,addShuffle
,removeShuffle
andupdateActiveShuffles
Executors Registry¶
executors: ConcurrentHashMap[String, Tracker]
ExecutorMonitor
uses a Java ConcurrentHashMap to track available executors.
An executor is added when (via ensureExecutorIsTracked):
An executor is removed when onExecutorRemoved.
All executors are removed when reset.
executors
is used when:
- onOtherEvent (cleanupShuffle)
- executorCount
- executorsKilled
- onUnpersistRDD
- onTaskEnd
- onJobStart
- onJobEnd
- pendingRemovalCount
- timedOutExecutors
fetchFromShuffleSvcEnabled Flag¶
fetchFromShuffleSvcEnabled: Boolean
ExecutorMonitor
initializes fetchFromShuffleSvcEnabled
internal flag based on the values of spark.shuffle.service.enabled and spark.shuffle.service.fetch.rdd.enabled configuration properties.
fetchFromShuffleSvcEnabled
is enabled (true
) when the aforementioned configuration properties are.
fetchFromShuffleSvcEnabled
is used when:
shuffleTrackingEnabled Flag¶
shuffleTrackingEnabled: Boolean
ExecutorMonitor
initializes shuffleTrackingEnabled
internal flag based on the values of spark.shuffle.service.enabled and spark.dynamicAllocation.shuffleTracking.enabled configuration properties.
shuffleTrackingEnabled
is enabled (true
) when the following holds:
- spark.shuffle.service.enabled is disabled
- spark.dynamicAllocation.shuffleTracking.enabled is enabled
When enabled, shuffleTrackingEnabled
is used to skip execution of the following (making them noops):
When disabled, shuffleTrackingEnabled
is used for the following:
spark.dynamicAllocation.cachedExecutorIdleTimeout¶
ExecutorMonitor
reads spark.dynamicAllocation.cachedExecutorIdleTimeout configuration property for Tracker
to updateTimeout.
onBlockUpdated¶
onBlockUpdated(
event: SparkListenerBlockUpdated): Unit
onBlockUpdated
is part of the SparkListenerInterface abstraction.
onBlockUpdated
...FIXME
onExecutorAdded¶
onExecutorAdded(
event: SparkListenerExecutorAdded): Unit
onExecutorAdded
is part of the SparkListenerInterface abstraction.
onExecutorAdded
...FIXME
onExecutorRemoved¶
onExecutorRemoved(
event: SparkListenerExecutorRemoved): Unit
onExecutorRemoved
is part of the SparkListenerInterface abstraction.
onExecutorRemoved
...FIXME
onJobEnd¶
onJobEnd(
event: SparkListenerJobEnd): Unit
onJobEnd
is part of the SparkListenerInterface abstraction.
onJobEnd
...FIXME
onJobStart¶
onJobStart(
event: SparkListenerJobStart): Unit
onJobStart
is part of the SparkListenerInterface abstraction.
Note
onJobStart
does nothing and simply returns when the shuffleTrackingEnabled flag is turned off (false
).
onJobStart
requests the input SparkListenerJobStart
for the StageInfos and converts...FIXME
onOtherEvent¶
onOtherEvent(
event: SparkListenerEvent): Unit
onOtherEvent
is part of the SparkListenerInterface abstraction.
onOtherEvent
...FIXME
cleanupShuffle¶
cleanupShuffle(
id: Int): Unit
cleanupShuffle
...FIXME
cleanupShuffle
is used when onOtherEvent
onTaskEnd¶
onTaskEnd(
event: SparkListenerTaskEnd): Unit
onTaskEnd
is part of the SparkListenerInterface abstraction.
onTaskEnd
...FIXME
onTaskStart¶
onTaskStart(
event: SparkListenerTaskStart): Unit
onTaskStart
is part of the SparkListenerInterface abstraction.
onTaskStart
...FIXME
onUnpersistRDD¶
onUnpersistRDD(
event: SparkListenerUnpersistRDD): Unit
onUnpersistRDD
is part of the SparkListenerInterface abstraction.
onUnpersistRDD
...FIXME
reset¶
reset(): Unit
reset
...FIXME
reset
is used when:
- FIXME
shuffleCleaned¶
shuffleCleaned(
shuffleId: Int): Unit
shuffleCleaned
is part of the CleanerListener abstraction.
shuffleCleaned
...FIXME
timedOutExecutors¶
timedOutExecutors(): Seq[String]
timedOutExecutors(
when: Long): Seq[String]
timedOutExecutors
...FIXME
timedOutExecutors
is used when:
ExecutorAllocationManager
is requested to schedule
executorCount¶
executorCount: Int
executorCount
...FIXME
executorCount
is used when:
ExecutorAllocationManager
is requested to addExecutors and removeExecutorsExecutorAllocationManagerSource
is requested for numberAllExecutors performance metric
pendingRemovalCount¶
pendingRemovalCount: Int
pendingRemovalCount
...FIXME
pendingRemovalCount
is used when:
ExecutorAllocationManager
is requested to removeExecutorsExecutorAllocationManagerSource
is requested for numberExecutorsPendingToRemove performance metric
executorsKilled¶
executorsKilled(
ids: Seq[String]): Unit
executorsKilled
...FIXME
executorsKilled
is used when:
ExecutorAllocationManager
is requested to removeExecutors
ensureExecutorIsTracked¶
ensureExecutorIsTracked(
id: String,
resourceProfileId: Int): Tracker
ensureExecutorIsTracked
...FIXME
ensureExecutorIsTracked
is used when:
getResourceProfileId¶
getResourceProfileId(
executorId: String): Int
getResourceProfileId
...FIXME
getResourceProfileId
is used for testing only.