ExecutorAllocationManager¶
ExecutorAllocationManager
can be used to dynamically allocate executors based on processing workload.
ExecutorAllocationManager
intercepts Spark events using the internal ExecutorAllocationListener that keeps track of the workload.
Creating Instance¶
ExecutorAllocationManager
takes the following to be created:
- ExecutorAllocationClient
- LiveListenerBus
- SparkConf
- ContextCleaner (default:
None
) -
Clock
(default:SystemClock
)
ExecutorAllocationManager
is created (and started) when SparkContext is created (with Dynamic Allocation of Executors enabled)
Validating Configuration¶
validateSettings(): Unit
validateSettings
makes sure that the settings for dynamic allocation are correct.
validateSettings
throws a SparkException
when the following are not met:
-
spark.dynamicAllocation.minExecutors must be positive
-
spark.dynamicAllocation.maxExecutors must be
0
or greater -
spark.dynamicAllocation.minExecutors must be less than or equal to spark.dynamicAllocation.maxExecutors
-
spark.dynamicAllocation.executorIdleTimeout must be greater than
0
-
spark.shuffle.service.enabled must be enabled.
-
The number of tasks per core, i.e. spark.executor.cores divided by spark.task.cpus, is not zero.
Performance Metrics¶
ExecutorAllocationManager
uses ExecutorAllocationManagerSource for performance metrics.
ExecutorMonitor¶
ExecutorAllocationManager
creates an ExecutorMonitor when created.
ExecutorMonitor
is added to the management queue (of LiveListenerBus) when ExecutorAllocationManager
is started.
ExecutorMonitor
is attached (to the ContextCleaner) when ExecutorAllocationManager
is started.
ExecutorMonitor
is requested to reset when ExecutorAllocationManager
is requested to reset.
ExecutorMonitor
is used for the performance metrics:
- numberExecutorsPendingToRemove (based on pendingRemovalCount)
- numberAllExecutors (based on executorCount)
ExecutorMonitor
is used for the following:
- timedOutExecutors when
ExecutorAllocationManager
is requested to schedule - executorCount when
ExecutorAllocationManager
is requested to addExecutors - executorCount, pendingRemovalCount and executorsKilled when
ExecutorAllocationManager
is requested to removeExecutors
ExecutorAllocationListener¶
ExecutorAllocationManager
creates an ExecutorAllocationListener when created to intercept Spark events that impact the allocation policy.
ExecutorAllocationListener
is added to the management queue (of LiveListenerBus) when ExecutorAllocationManager
is started.
ExecutorAllocationListener
is used to calculate the maximum number of executors needed.
spark.dynamicAllocation.executorAllocationRatio¶
ExecutorAllocationManager
uses spark.dynamicAllocation.executorAllocationRatio configuration property for maxNumExecutorsNeeded.
tasksPerExecutorForFullParallelism¶
ExecutorAllocationManager
uses spark.executor.cores and spark.task.cpus configuration properties for the number of tasks that can be submitted to an executor for full parallelism.
Used when:
Maximum Number of Executors Needed¶
maxNumExecutorsNeeded(): Int
maxNumExecutorsNeeded
requests the ExecutorAllocationListener for the number of pending and running tasks.
maxNumExecutorsNeeded
is the smallest integer value that is greater than or equal to the multiplication of the total number of pending and running tasks by executorAllocationRatio divided by tasksPerExecutorForFullParallelism.
maxNumExecutorsNeeded
is used for:
- updateAndSyncNumExecutorsTarget
- numberMaxNeededExecutors performance metric
ExecutorAllocationClient¶
ExecutorAllocationManager
is given an ExecutorAllocationClient when created.
Starting ExecutorAllocationManager¶
start(): Unit
start
requests the LiveListenerBus to add to the management queue:
start
requests the ContextCleaner (if defined) to attach the ExecutorMonitor.
creates a scheduleTask
(a Java Runnable) for schedule when started.
start
requests the ScheduledExecutorService to schedule the scheduleTask
every 100
ms.
Note
The schedule delay of 100
is not configurable.
start
requests the ExecutorAllocationClient to request the total executors with the following:
start
is used when SparkContext
is created.
Scheduling Executors¶
schedule(): Unit
schedule
requests the ExecutorMonitor for timedOutExecutors.
If there are executors to be removed, schedule
turns the initializing internal flag off.
schedule
updateAndSyncNumExecutorsTarget with the current time.
In the end, schedule
removes the executors to be removed if there are any.
updateAndSyncNumExecutorsTarget¶
updateAndSyncNumExecutorsTarget(
now: Long): Int
updateAndSyncNumExecutorsTarget
maxNumExecutorsNeeded.
updateAndSyncNumExecutorsTarget
...FIXME
Stopping ExecutorAllocationManager¶
stop(): Unit
stop
shuts down <
Note
stop
waits 10 seconds for the termination to be complete.
stop
is used when SparkContext
is requested to stop
spark-dynamic-executor-allocation Allocation Executor¶
spark-dynamic-executor-allocation
allocation executor is a...FIXME
ExecutorAllocationManagerSource¶
ExecutorAllocationManagerSource
Removing Executors¶
removeExecutors(
executors: Seq[(String, Int)]): Seq[String]
removeExecutors
...FIXME
removeExecutors
is used when:
ExecutorAllocationManager
is requested to schedule executors
Logging¶
Enable ALL
logging level for org.apache.spark.ExecutorAllocationManager
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.ExecutorAllocationManager=ALL
Refer to Logging.