Dynamic Allocation (of Executors)

Dynamic Allocation (of Executors) (aka Elastic Scaling) is a Spark feature that allows for adding or removing Spark executors dynamically to match the workload.

Unlike the "traditional" static allocation where a Spark application reserves CPU and memory resources upfront (irrespective of how much it may eventually use), in dynamic allocation you get as much as needed and no more. It scales the number of executors up and down based on workload, i.e. idle executors are removed, and when there are pending tasks waiting for executors to be launched on, dynamic allocation requests them.

Dynamic allocation is enabled using spark.dynamicAllocation.enabled setting. When enabled, it is assumed that the External Shuffle Service is also used (it is not by default as controlled by spark.shuffle.service.enabled property).

ExecutorAllocationManager is responsible for dynamic allocation of executors. With dynamic allocation enabled, it is started when SparkContext is initialized.

Dynamic allocation reports the current state using ExecutorAllocationManager metric source.

Dynamic Allocation comes with the policy of scaling executors up and down as follows:

  1. Scale Up Policy requests new executors when there are pending tasks and increases the number of executors exponentially since executors start slow and Spark application may need slightly more.

  2. Scale Down Policy removes executors that have been idle for spark.dynamicAllocation.executorIdleTimeout seconds.

Dynamic allocation is available for all the currently-supported cluster managers, i.e. Spark Standalone, Hadoop YARN and Apache Mesos.

Review the excellent slide deck Dynamic Allocation in Spark from Databricks.

Is Dynamic Allocation Enabled? — Utils.isDynamicAllocationEnabled Method

isDynamicAllocationEnabled(conf: SparkConf): Boolean

isDynamicAllocationEnabled returns true if all the following conditions hold:

  1. spark.dynamicAllocation.enabled is enabled (i.e. true)

  2. Spark on cluster is used (i.e. spark.master is non-local)

  3. spark.dynamicAllocation.testing is enabled (i.e. true)

Otherwise, isDynamicAllocationEnabled returns false.

isDynamicAllocationEnabled returns true, i.e. dynamic allocation is enabled, in Spark local (pseudo-cluster) for testing only (with spark.dynamicAllocation.testing enabled).
isDynamicAllocationEnabled is used when Spark calculates the initial number of executors for coarse-grained scheduler backends for YARN, Spark Standalone, and Mesos. It is also used for Spark Streaming.

Enable WARN logging level for org.apache.spark.util.Utils logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.util.Utils=WARN

Refer to Logging.

Programmable Dynamic Allocation

Getting Initial Number of Executors for Dynamic Allocation — Utils.getDynamicAllocationInitialExecutors Method

getDynamicAllocationInitialExecutors(conf: SparkConf): Int

getDynamicAllocationInitialExecutors first makes sure that spark.dynamicAllocation.initialExecutors is equal or greater than spark.dynamicAllocation.minExecutors.

spark.dynamicAllocation.initialExecutors falls back to spark.dynamicAllocation.minExecutors if not set. Why to print the WARN message to the logs?

If not, you should see the following WARN message in the logs:

spark.dynamicAllocation.initialExecutors less than spark.dynamicAllocation.minExecutors is invalid, ignoring its setting, please update your configs.

getDynamicAllocationInitialExecutors makes sure that spark.executor.instances is greater than spark.dynamicAllocation.minExecutors.

Both spark.executor.instances and spark.dynamicAllocation.minExecutors fall back to 0 when no defined explicitly.

If not, you should see the following WARN message in the logs:

spark.executor.instances less than spark.dynamicAllocation.minExecutors is invalid, ignoring its setting, please update your configs.

getDynamicAllocationInitialExecutors sets the initial number of executors to be the maximum of:

You should see the following INFO message in the logs:

Using initial executors = [initialExecutors], max of spark.dynamicAllocation.initialExecutors, spark.dynamicAllocation.minExecutors and spark.executor.instances

Settings

Table 1. Spark Properties
Spark Property Default Value Description

spark.dynamicAllocation.enabled

false

Flag to enable (true) or disable (false) dynamic allocation.

NOTE: spark.executor.instances setting can be set using --num-executors command-line option of spark-submit.

spark.dynamicAllocation.initialExecutors

spark.dynamicAllocation.minExecutors

Initial number of executors for dynamic allocation.

NOTE: getDynamicAllocationInitialExecutors warns when spark.dynamicAllocation.initialExecutors is less than spark.dynamicAllocation.minExecutors.

spark.dynamicAllocation.minExecutors

0

Minimum number of executors for dynamic allocation.

Must be positive and less than or equal to spark.dynamicAllocation.maxExecutors.

spark.dynamicAllocation.maxExecutors

Integer.MAX_VALUE

Maximum number of executors for dynamic allocation.

Must be greater than 0 and greater than or equal to spark.dynamicAllocation.minExecutors.

spark.dynamicAllocation.schedulerBacklogTimeout

1s

Must be greater than 0.

spark.dynamicAllocation.sustainedSchedulerBacklogTimeout

spark.dynamicAllocation.schedulerBacklogTimeout)

Must be greater than 0.

spark.dynamicAllocation.executorIdleTimeout

60s

Time for how long an executor can be idle before it gets removed.

Must be greater than 0.

spark.dynamicAllocation.cachedExecutorIdleTimeout

Integer.MAX_VALUE

spark.dynamicAllocation.testing

Future

  • SPARK-4922

  • SPARK-4751

  • SPARK-7955