Skip to content

Dynamic Allocation of Executors

Dynamic Allocation of Executors (Dynamic Resource Allocation or Elastic Scaling) is a Spark service for adding and removing Spark executors dynamically on demand to match workload.

Unlike the "traditional" static allocation where a Spark application reserves CPU and memory resources upfront (irrespective of how much it may eventually use), in dynamic allocation you get as much as needed and no more. It scales the number of executors up and down based on workload, i.e. idle executors are removed, and when there are pending tasks waiting for executors to be launched on, dynamic allocation requests them.

Dynamic Allocation is enabled (and SparkContext creates an ExecutorAllocationManager) when:

  1. spark.dynamicAllocation.enabled configuration property is enabled

  2. spark.master is non-local

  3. SchedulerBackend is an ExecutorAllocationClient

ExecutorAllocationManager is the heart of Dynamic Resource Allocation.

When enabled, it is recommended to use the External Shuffle Service.

Dynamic Allocation comes with the policy of scaling executors up and down as follows:

  1. Scale Up Policy requests new executors when there are pending tasks and increases the number of executors exponentially since executors start slow and Spark application may need slightly more.
  2. Scale Down Policy removes executors that have been idle for spark.dynamicAllocation.executorIdleTimeout seconds.

Performance Metrics

ExecutorAllocationManagerSource metric source is used to report performance metrics.

SparkContext.killExecutors

SparkContext.killExecutors is unsupported with Dynamic Allocation enabled.

Programmable Dynamic Allocation

SparkContext offers a developer API to scale executors up or down.

Getting Initial Number of Executors for Dynamic Allocation

getDynamicAllocationInitialExecutors(conf: SparkConf): Int

getDynamicAllocationInitialExecutors first makes sure that <> is equal or greater than <>.

NOTE: <> falls back to <> if not set. Why to print the WARN message to the logs?

If not, you should see the following WARN message in the logs:

spark.dynamicAllocation.initialExecutors less than spark.dynamicAllocation.minExecutors is invalid, ignoring its setting, please update your configs.

getDynamicAllocationInitialExecutors makes sure that executor:Executor.md#spark.executor.instances[spark.executor.instances] is greater than <>.

NOTE: Both executor:Executor.md#spark.executor.instances[spark.executor.instances] and <> fall back to 0 when no defined explicitly.

If not, you should see the following WARN message in the logs:

spark.executor.instances less than spark.dynamicAllocation.minExecutors is invalid, ignoring its setting, please update your configs.

getDynamicAllocationInitialExecutors sets the initial number of executors to be the maximum of:

You should see the following INFO message in the logs:

Using initial executors = [initialExecutors], max of spark.dynamicAllocation.initialExecutors, spark.dynamicAllocation.minExecutors and spark.executor.instances

getDynamicAllocationInitialExecutors is used when ExecutorAllocationManager is requested to set the initial number of executors.

Resources

Documentation

Slides