ExecutorAllocationManager¶
ExecutorAllocationManager can be used to dynamically allocate executors based on processing workload.
ExecutorAllocationManager intercepts Spark events using the internal ExecutorAllocationListener that keeps track of the workload.
Creating Instance¶
ExecutorAllocationManager takes the following to be created:
- ExecutorAllocationClient
- LiveListenerBus
- SparkConf
- ContextCleaner (default:
None) -
Clock(default:SystemClock)
ExecutorAllocationManager is created (and started) when SparkContext is created (with Dynamic Allocation of Executors enabled)
Validating Configuration¶
validateSettings(): Unit
validateSettings makes sure that the settings for dynamic allocation are correct.
validateSettings throws a SparkException when the following are not met:
-
spark.dynamicAllocation.minExecutors must be positive
-
spark.dynamicAllocation.maxExecutors must be
0or greater -
spark.dynamicAllocation.minExecutors must be less than or equal to spark.dynamicAllocation.maxExecutors
-
spark.dynamicAllocation.executorIdleTimeout must be greater than
0 -
spark.shuffle.service.enabled must be enabled.
-
The number of tasks per core, i.e. spark.executor.cores divided by spark.task.cpus, is not zero.
Performance Metrics¶
ExecutorAllocationManager uses ExecutorAllocationManagerSource for performance metrics.
ExecutorMonitor¶
ExecutorAllocationManager creates an ExecutorMonitor when created.
ExecutorMonitor is added to the management queue (of LiveListenerBus) when ExecutorAllocationManager is started.
ExecutorMonitor is attached (to the ContextCleaner) when ExecutorAllocationManager is started.
ExecutorMonitor is requested to reset when ExecutorAllocationManager is requested to reset.
ExecutorMonitor is used for the performance metrics:
- numberExecutorsPendingToRemove (based on pendingRemovalCount)
- numberAllExecutors (based on executorCount)
ExecutorMonitor is used for the following:
- timedOutExecutors when
ExecutorAllocationManageris requested to schedule - executorCount when
ExecutorAllocationManageris requested to addExecutors - executorCount, pendingRemovalCount and executorsKilled when
ExecutorAllocationManageris requested to removeExecutors
ExecutorAllocationListener¶
ExecutorAllocationManager creates an ExecutorAllocationListener when created to intercept Spark events that impact the allocation policy.
ExecutorAllocationListener is added to the management queue (of LiveListenerBus) when ExecutorAllocationManager is started.
ExecutorAllocationListener is used to calculate the maximum number of executors needed.
spark.dynamicAllocation.executorAllocationRatio¶
ExecutorAllocationManager uses spark.dynamicAllocation.executorAllocationRatio configuration property for maxNumExecutorsNeeded.
tasksPerExecutorForFullParallelism¶
ExecutorAllocationManager uses spark.executor.cores and spark.task.cpus configuration properties for the number of tasks that can be submitted to an executor for full parallelism.
Used when:
Maximum Number of Executors Needed¶
maxNumExecutorsNeeded(): Int
maxNumExecutorsNeeded requests the ExecutorAllocationListener for the number of pending and running tasks.
maxNumExecutorsNeeded is the smallest integer value that is greater than or equal to the multiplication of the total number of pending and running tasks by executorAllocationRatio divided by tasksPerExecutorForFullParallelism.
maxNumExecutorsNeeded is used for:
- updateAndSyncNumExecutorsTarget
- numberMaxNeededExecutors performance metric
ExecutorAllocationClient¶
ExecutorAllocationManager is given an ExecutorAllocationClient when created.
Starting ExecutorAllocationManager¶
start(): Unit
start requests the LiveListenerBus to add to the management queue:
start requests the ContextCleaner (if defined) to attach the ExecutorMonitor.
creates a scheduleTask (a Java Runnable) for schedule when started.
start requests the ScheduledExecutorService to schedule the scheduleTask every 100 ms.
Note
The schedule delay of 100 is not configurable.
start requests the ExecutorAllocationClient to request the total executors with the following:
start is used when SparkContext is created.
Scheduling Executors¶
schedule(): Unit
schedule requests the ExecutorMonitor for timedOutExecutors.
If there are executors to be removed, schedule turns the initializing internal flag off.
schedule updateAndSyncNumExecutorsTarget with the current time.
In the end, schedule removes the executors to be removed if there are any.
updateAndSyncNumExecutorsTarget¶
updateAndSyncNumExecutorsTarget(
now: Long): Int
updateAndSyncNumExecutorsTarget maxNumExecutorsNeeded.
updateAndSyncNumExecutorsTarget...FIXME
Stopping ExecutorAllocationManager¶
stop(): Unit
stop shuts down <
Note
stop waits 10 seconds for the termination to be complete.
stop is used when SparkContext is requested to stop
spark-dynamic-executor-allocation Allocation Executor¶
spark-dynamic-executor-allocation allocation executor is a...FIXME
ExecutorAllocationManagerSource¶
ExecutorAllocationManagerSource
Removing Executors¶
removeExecutors(
executors: Seq[(String, Int)]): Seq[String]
removeExecutors...FIXME
removeExecutors is used when:
ExecutorAllocationManageris requested to schedule executors
Logging¶
Enable ALL logging level for org.apache.spark.ExecutorAllocationManager logger to see what happens inside.
Add the following line to conf/log4j.properties:
log4j.logger.org.apache.spark.ExecutorAllocationManager=ALL
Refer to Logging.