Skip to content

ExecutorAllocationManager

ExecutorAllocationManager can be used to dynamically allocate executors based on processing workload.

ExecutorAllocationManager intercepts Spark events using the internal ExecutorAllocationListener that keeps track of the workload.

Creating Instance

ExecutorAllocationManager takes the following to be created:

ExecutorAllocationManager is created (and started) when SparkContext is created (with Dynamic Allocation of Executors enabled)

Validating Configuration

validateSettings(): Unit

validateSettings makes sure that the settings for dynamic allocation are correct.

validateSettings throws a SparkException when the following are not met:

Performance Metrics

ExecutorAllocationManager uses ExecutorAllocationManagerSource for performance metrics.

ExecutorMonitor

ExecutorAllocationManager creates an ExecutorMonitor when created.

ExecutorMonitor is added to the management queue (of LiveListenerBus) when ExecutorAllocationManager is started.

ExecutorMonitor is attached (to the ContextCleaner) when ExecutorAllocationManager is started.

ExecutorMonitor is requested to reset when ExecutorAllocationManager is requested to reset.

ExecutorMonitor is used for the performance metrics:

ExecutorMonitor is used for the following:

ExecutorAllocationListener

ExecutorAllocationManager creates an ExecutorAllocationListener when created to intercept Spark events that impact the allocation policy.

ExecutorAllocationListener is added to the management queue (of LiveListenerBus) when ExecutorAllocationManager is started.

ExecutorAllocationListener is used to calculate the maximum number of executors needed.

spark.dynamicAllocation.executorAllocationRatio

ExecutorAllocationManager uses spark.dynamicAllocation.executorAllocationRatio configuration property for maxNumExecutorsNeeded.

tasksPerExecutorForFullParallelism

ExecutorAllocationManager uses spark.executor.cores and spark.task.cpus configuration properties for the number of tasks that can be submitted to an executor for full parallelism.

Used when:

Maximum Number of Executors Needed

maxNumExecutorsNeeded(): Int

maxNumExecutorsNeeded requests the ExecutorAllocationListener for the number of pending and running tasks.

maxNumExecutorsNeeded is the smallest integer value that is greater than or equal to the multiplication of the total number of pending and running tasks by executorAllocationRatio divided by tasksPerExecutorForFullParallelism.

maxNumExecutorsNeeded is used for:

ExecutorAllocationClient

ExecutorAllocationManager is given an ExecutorAllocationClient when created.

Starting ExecutorAllocationManager

start(): Unit

start requests the LiveListenerBus to add to the management queue:

start requests the ContextCleaner (if defined) to attach the ExecutorMonitor.

creates a scheduleTask (a Java Runnable) for schedule when started.

start requests the ScheduledExecutorService to schedule the scheduleTask every 100 ms.

Note

The schedule delay of 100 is not configurable.

start requests the ExecutorAllocationClient to request the total executors with the following:

start is used when SparkContext is created.

Scheduling Executors

schedule(): Unit

schedule requests the ExecutorMonitor for timedOutExecutors.

If there are executors to be removed, schedule turns the initializing internal flag off.

schedule updateAndSyncNumExecutorsTarget with the current time.

In the end, schedule removes the executors to be removed if there are any.

updateAndSyncNumExecutorsTarget

updateAndSyncNumExecutorsTarget(
  now: Long): Int

updateAndSyncNumExecutorsTarget maxNumExecutorsNeeded.

updateAndSyncNumExecutorsTarget...FIXME

Stopping ExecutorAllocationManager

stop(): Unit

stop shuts down <>.

Note

stop waits 10 seconds for the termination to be complete.

stop is used when SparkContext is requested to stop

spark-dynamic-executor-allocation Allocation Executor

spark-dynamic-executor-allocation allocation executor is a...FIXME

ExecutorAllocationManagerSource

ExecutorAllocationManagerSource

Removing Executors

removeExecutors(
  executors: Seq[(String, Int)]): Seq[String]

removeExecutors...FIXME

removeExecutors is used when:

Logging

Enable ALL logging level for org.apache.spark.ExecutorAllocationManager logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.ExecutorAllocationManager=ALL

Refer to Logging.