ExecutorAllocationManager — Allocation Manager for Spark Core

ExecutorAllocationManager is responsible for dynamically allocating and removing[executors] based on the workload.

It intercepts Spark events using the internal[ExecutorAllocationListener] that keeps track of the workload (changing the <> that the allocation manager uses for executors management).

It uses[ExecutorAllocationClient],[], and[SparkConf] (that are all passed in when ExecutorAllocationManager is created).

ExecutorAllocationManager is created when SparkContext is created.

NOTE: SparkContext expects that SchedulerBackend follows the[ExecutorAllocationClient contract] when dynamic allocation of executors is enabled.

[[internal-properties]] .ExecutorAllocationManager's Internal Properties [cols="1,1,2",options="header",width="100%"] |=== | Name | Initial Value | Description

| [[executorAllocationManagerSource]] executorAllocationManagerSource |[ExecutorAllocationManagerSource] | FIXME

| tasksPerExecutorForFullParallelism | a| [[tasksPerExecutorForFullParallelism]] |===

[[internal-registries]] .ExecutorAllocationManager's Internal Registries and Counters [cols="1,2",options="header",width="100%"] |=== | Name | Description

| [[executorsPendingToRemove]] executorsPendingToRemove | Internal cache with...FIXME

Used when...FIXME

| [[removeTimes]] removeTimes | Internal cache with...FIXME

Used when...FIXME

| [[executorIds]] executorIds | Internal cache with...FIXME

Used when...FIXME

| [[initialNumExecutors]] initialNumExecutors | FIXME

| [[numExecutorsTarget]] numExecutorsTarget | FIXME

| [[numExecutorsToAdd]] numExecutorsToAdd | FIXME

| [[initializing]] initializing | Flag whether...FIXME

Starts enabled (i.e. true).



Enable INFO logging level for org.apache.spark.ExecutorAllocationManager logger to see what happens inside.

Add the following line to conf/

Refer to[Logging].

=== [[addExecutors]] addExecutors Method


=== [[removeExecutor]] removeExecutor Method


=== [[maxNumExecutorsNeeded]] maxNumExecutorsNeeded Method


=== [[start]] Starting ExecutorAllocationManager -- start Method

[source, scala]

start(): Unit

start registers[ExecutorAllocationListener] (with[]) to monitor scheduler events and make decisions when to add and remove executors. It then immediately starts <> that is responsible for the <> every 100 milliseconds.

NOTE: 100 milliseconds for the period between successive <> is fixed, i.e. not configurable.

It[requests executors] using the input[ExecutorAllocationClient]. It requests[spark.dynamicAllocation.initialExecutors].

start is used when SparkContext is created.

=== [[schedule]] Scheduling Executors -- schedule Method

[source, scala]

schedule(): Unit

schedule calls <> to...FIXME

It then go over <> to remove expired executors, i.e. executors for which expiration time has elapsed.

=== [[updateAndSyncNumExecutorsTarget]] updateAndSyncNumExecutorsTarget Method

[source, scala]

updateAndSyncNumExecutorsTarget(now: Long): Int


If ExecutorAllocationManager is <> it returns 0.

=== [[reset]] Resetting ExecutorAllocationManager -- reset Method

[source, scala]

reset(): Unit

reset resets ExecutorAllocationManager to its initial state, i.e.

  1. <> is enabled (i.e. true).
  2. The <> is set to <>.
  3. The <> is set to 1.
  4. All <> are cleared.
  5. All <> are cleared.

=== [[stop]] Stopping ExecutorAllocationManager -- stop Method

[source, scala]

stop(): Unit

stop shuts down <>.

NOTE: stop waits 10 seconds for the termination to be complete.

=== [[creating-instance]] Creating ExecutorAllocationManager Instance

ExecutorAllocationManager takes the following when created:

  • [[client]][ExecutorAllocationClient]
  • [[listenerBus]][]
  • [[conf]][SparkConf]

ExecutorAllocationManager initializes the <>.

=== [[validateSettings]] Validating Configuration of Dynamic Allocation -- validateSettings Internal Method

[source, scala]

validateSettings(): Unit

validateSettings makes sure that the[settings for dynamic allocation] are correct.

validateSettings validates the following and throws a SparkException if not set correctly.

. <> must be positive

. <> must be 0 or greater

. <> must be less than or equal to <>

. <> must be greater than 0

.[spark.shuffle.service.enabled] must be enabled.

. The number of tasks per core, i.e.[spark.executor.cores] divided by[spark.task.cpus], is not zero.

NOTE: validateSettings is used when <ExecutorAllocationManager is created>>.

=== [[spark-dynamic-executor-allocation]] spark-dynamic-executor-allocation Allocation Executor

spark-dynamic-executor-allocation allocation executor is a...FIXME

It is started...

It is stopped...

Last update: 2020-10-08