Configuration Properties¶
spark.app.id¶
Unique identifier of a Spark application that Spark uses to uniquely identify metric sources.
Default: TaskScheduler.applicationId()
Set when SparkContext is created
spark.broadcast.blockSize¶
The size of each piece of a block (in kB unless the unit is specified)
Default: 4m
Too large a value decreases parallelism during broadcast (makes it slower); however, if it is too small, BlockManager might take a performance hit
Used when:
TorrentBroadcast
is requested to setConf
spark.broadcast.compress¶
Controls broadcast variable compression (before sending them over the wire)
Default: true
Generally a good idea. Compression will use spark.io.compression.codec
Used when:
spark.buffer.pageSize¶
spark.buffer.pageSize
The amount of memory used per page (in bytes)
Default: (undefined)
Used when:
MemoryManager
is created
spark.cleaner.referenceTracking¶
Controls whether to enable ContextCleaner
Default: true
spark.diskStore.subDirectories¶
Number of subdirectories inside each path listed in spark.local.dir for hashing block files into.
Default: 64
Used by BlockManager and DiskBlockManager
spark.driver.host¶
Address of the driver (endpoints)
Default: Utils.localCanonicalHostName
spark.driver.log.allowErasureCoding¶
Default: false
Used when:
DfsAsyncWriter
is requested toinit
spark.driver.log.dfsDir¶
The directory on a Hadoop DFS-compliant file system where DriverLogger copies driver logs to
Default: (undefined)
Used when:
FsHistoryProvider
is requested to startPolling (and cleanDriverLogs)DfsAsyncWriter
is requested toinit
DriverLogger
utility is used to create a DriverLogger (for a SparkContext)
spark.driver.log.persistToDfs.enabled¶
Enables DriverLogger
Default: false
Used when:
DriverLogger
utility is used to create a DriverLogger (for a SparkContext)
spark.driver.maxResultSize¶
Maximum size of task results (in bytes)
Default: 1g
Used when:
-
TaskRunner
is requested to run a task (and decide on the type of a serialized task result) -
TaskSetManager
is requested to check available memory for task results
spark.driver.port¶
Port of the driver (endpoints)
Default: 0
spark.executor.cores¶
Number of CPU cores for Executor
Default: 1
spark.executor.heartbeat.maxFailures¶
Number of times an Executor tries sending heartbeats to the driver before it gives up and exits (with exit code 56
).
Default: 60
For example, with max failures 60
(the default) and spark.executor.heartbeatInterval 10s
, then Executor
will try to send heartbeats for up to 600s
(10 minutes).
Used when:
Executor
is created (and reportHeartBeat)
spark.executor.heartbeatInterval¶
Interval between Executor heartbeats (to the driver)
Default: 10s
Used when:
SparkContext
is createdExecutor
is created and requested to reportHeartBeatHeartbeatReceiver
is created
spark.executor.id¶
Default: (undefined)
spark.executor.instances¶
Number of executors to use
Default: (undefined)
spark.executor.memory¶
Amount of memory to use for an Executor
Default: 1g
Equivalent to SPARK_EXECUTOR_MEMORY environment variable.
spark.executor.memoryOverhead¶
The amount of non-heap memory (in MiB) to be allocated per executor
Used when:
ResourceProfile
is requested for the default executor resourcesClient
(Spark on YARN) is created
spark.executor.metrics.fileSystemSchemes¶
A comma-separated list of the file system schemes to report in executor metrics
Default: file,hdfs
spark.executor.metrics.pollingInterval¶
How often to collect executor metrics (in ms):
0
- the polling is done on executor heartbeats- A positive number - the polling is done at this interval
Default: 0
Used when:
Executor
is created
spark.executor.userClassPathFirst¶
Controls whether to load classes in user-defined jars before those in Spark jars
Default: false
Used when:
CoarseGrainedExecutorBackend
is requested to create a ClassLoaderExecutor
is createdClient
utility (Spark on YARN) is used toisUserClassPathFirst
spark.extraListeners¶
A comma-separated list of fully-qualified class names of SparkListeners (to be registered when SparkContext is created)
Default: (empty)
spark.file.transferTo¶
Controls whether to use Java FileChannels (Java NIO) for copying data between two Java FileInputStream
s to improve copy performance
Default: true
Used when:
- BypassMergeSortShuffleWriter and UnsafeShuffleWriter are created
spark.files¶
The files to be added to a Spark application (that can be defined directly as a configuration property or indirectly using --files
option of spark-submit
script)
Default: (empty)
Used when:
SparkContext
is created
spark.io.encryption.enabled¶
Controls local disk I/O encryption
Default: false
Used when:
SparkEnv
utility is used to create a SparkEnv for the driver (to create a IO encryption key)BlockStoreShuffleReader
is requested to read combined records (and fetchContinuousBlocksInBatch)
spark.jars¶
Default: (empty)
spark.kryo.pool¶
Default: true
Used when:
KryoSerializer
is created
spark.kryo.unsafe¶
Whether KryoSerializer should use Unsafe-based IO for serialization
Default: false
spark.local.dir¶
A comma-separated list of directory paths for "scratch" space (a temporary storage for map output files, RDDs that get stored on disk, etc.). It is recommended to use paths on fast local disks in your system (e.g. SSDs).
Default: java.io.tmpdir
System property
spark.locality.wait¶
How long to wait until an executor is available for locality-aware delay scheduling (for PROCESS_LOCAL
, NODE_LOCAL
, and RACK_LOCAL
TaskLocalities) unless locality-specific setting is set (i.e., spark.locality.wait.process, spark.locality.wait.node, and spark.locality.wait.rack, respectively)
Default: 3s
spark.locality.wait.legacyResetOnTaskLaunch¶
(internal) Whether to use the legacy behavior of locality wait, which resets the delay timer anytime a task is scheduled.
Default: false
Used when:
spark.locality.wait.node¶
Scheduling delay for TaskLocality.NODE_LOCAL
Default: spark.locality.wait
Used when:
TaskSetManager
is requested for the locality wait (ofTaskLocality.NODE_LOCAL
)
spark.locality.wait.process¶
Scheduling delay for TaskLocality.PROCESS_LOCAL
Default: spark.locality.wait
Used when:
TaskSetManager
is requested for the locality wait (ofTaskLocality.PROCESS_LOCAL
)
spark.locality.wait.rack¶
Scheduling delay for TaskLocality.RACK_LOCAL
Default: spark.locality.wait
Used when:
TaskSetManager
is requested for the locality wait (ofTaskLocality.RACK_LOCAL
)
spark.logConf¶
Default: false
spark.logLineage¶
Enables printing out the RDD lineage graph (using RDD.toDebugString) when executing an action (and running a job)
Default: false
spark.master¶
Master URL of the cluster manager to connect the Spark application to
spark.memory.fraction¶
Fraction of JVM heap space used for execution and storage.
Default: 0.6
The lower the more frequent spills and cached data eviction. The purpose of this config is to set aside memory for internal metadata, user data structures, and imprecise size estimation in the case of sparse, unusually large records. Leaving this at the default value is recommended.
spark.memory.offHeap.enabled¶
Controls whether Tungsten memory will be allocated on the JVM heap (false
) or off-heap (true
/ using sun.misc.Unsafe
).
Default: false
When enabled, spark.memory.offHeap.size must be greater than 0.
Used when:
MemoryManager
is requested for tungstenMemoryMode
spark.memory.offHeap.size¶
Maximum memory (in bytes) for off-heap memory allocation
Default: 0
This setting has no impact on heap memory usage, so if your executors' total memory consumption must fit within some hard limit then be sure to shrink your JVM heap size accordingly.
Must not be negative and be set to a positive value when spark.memory.offHeap.enabled is enabled
spark.memory.storageFraction¶
Amount of storage memory immune to eviction, expressed as a fraction of the size of the region set aside by spark.memory.fraction.
Default: 0.5
The higher the less working memory may be available to execution and tasks may spill to disk more often. The default value is recommended.
Must be in [0,1)
Used when:
spark.network.io.preferDirectBufs¶
Default: true
spark.network.maxRemoteBlockSizeFetchToMem¶
Remote block will be fetched to disk when size of the block is above this threshold in bytes
This is to avoid a giant request takes too much memory. Note this configuration will affect both shuffle fetch and block manager remote block fetch.
With an external shuffle service use at least 2.3.0
Default: 200m
Used when:
BlockStoreShuffleReader
is requested to read combined records for a reduce taskNettyBlockTransferService
is requested to uploadBlockBlockManager
is requested to fetchRemoteManagedBuffer
Default: true
spark.network.timeout¶
Network timeout (in seconds) to use for RPC remote endpoint lookup
Default: 120s
spark.network.timeoutInterval¶
(in millis)
Default: spark.storage.blockManagerTimeoutIntervalMs
spark.rdd.compress¶
Controls whether to compress RDD partitions when stored serialized
Default: false
spark.reducer.maxBlocksInFlightPerAddress¶
Maximum number of remote blocks being fetched per reduce task from a given host port
When a large number of blocks are being requested from a given address in a single fetch or simultaneously, this could crash the serving executor or a Node Manager. This is especially useful to reduce the load on the Node Manager when external shuffle is enabled. You can mitigate the issue by setting it to a lower value.
Default: (unlimited)
Used when:
BlockStoreShuffleReader
is requested to read combined records for a reduce task
spark.reducer.maxReqsInFlight¶
Maximum number of remote requests to fetch blocks at any given point
When the number of hosts in the cluster increase, it might lead to very large number of inbound connections to one or more nodes, causing the workers to fail under load. By allowing it to limit the number of fetch requests, this scenario can be mitigated
Default: (unlimited)
Used when:
BlockStoreShuffleReader
is requested to read combined records for a reduce task
spark.reducer.maxSizeInFlight¶
Maximum size of all map outputs to fetch simultaneously from each reduce task (in MiB unless otherwise specified)
Since each output requires us to create a buffer to receive it, this represents a fixed memory overhead per reduce task, so keep it small unless you have a large amount of memory
Default: 48m
Used when:
BlockStoreShuffleReader
is requested to read combined records for a reduce task
spark.repl.class.uri¶
Controls whether to compress RDD partitions when stored serialized
Default: false
spark.rpc.lookupTimeout¶
Default Endpoint Lookup Timeout
Default: 120s
spark.rpc.message.maxSize¶
Maximum allowed message size for RPC communication (in MB
unless specified)
Default: 128
Must be below 2047MB (Int.MaxValue / 1024 / 1024
)
Used when:
CoarseGrainedSchedulerBackend
is requested to launch tasksRpcUtils
is requested for the maximum message sizeExecutor
is createdMapOutputTrackerMaster
is created (and makes sure that spark.shuffle.mapOutput.minSizeForBroadcast is below the threshold)
spark.scheduler¶
barrier.maxConcurrentTasksCheck.interval¶
spark.scheduler.barrier.maxConcurrentTasksCheck.interval
barrier.maxConcurrentTasksCheck.maxFailures¶
spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures
minRegisteredResourcesRatio¶
spark.scheduler.minRegisteredResourcesRatio
Minimum ratio of (registered resources / total expected resources) before submitting tasks
Default: (undefined)
spark.scheduler.revive.interval¶
spark.scheduler.revive.interval
The time (in millis) between resource offers revives
Default: 1s
Used when:
DriverEndpoint
is requested to onStart
spark.serializer¶
The fully-qualified class name of the Serializer (of the driver and executors)
Default: org.apache.spark.serializer.JavaSerializer
Used when:
SparkEnv
utility is used to create a SparkEnvSparkConf
is requested to registerKryoClasses (as a side-effect)
spark.shuffle¶
sort.io.plugin.class¶
spark.shuffle.sort.io.plugin.class
Name of the class to use for shuffle IO
Default: LocalDiskShuffleDataIO
Used when:
ShuffleDataIOUtils
is requested to loadShuffleDataIO
checksum.enabled¶
spark.shuffle.checksum.enabled
Controls checksuming of shuffle data. If enabled, Spark will calculate the checksum values for each partition data within the map output file and store the values in a checksum file on the disk. When there's shuffle data corruption detected, Spark will try to diagnose the cause (e.g., network issue, disk issue, etc.) of the corruption by using the checksum file.
Default: true
compress¶
spark.shuffle.compress
Enables compressing shuffle output when stored
Default: true
detectCorrupt¶
spark.shuffle.detectCorrupt
Controls corruption detection in fetched blocks
Default: true
Used when:
BlockStoreShuffleReader
is requested to read combined records for a reduce task
detectCorrupt.useExtraMemory¶
spark.shuffle.detectCorrupt.useExtraMemory
If enabled, part of a compressed/encrypted stream will be de-compressed/de-crypted by using extra memory to detect early corruption. Any IOException
thrown will cause the task to be retried once and if it fails again with same exception, then FetchFailedException
will be thrown to retry previous stage
Default: false
Used when:
BlockStoreShuffleReader
is requested to read combined records for a reduce task
file.buffer¶
spark.shuffle.file.buffer
Size of the in-memory buffer for each shuffle file output stream, in KiB unless otherwise specified. These buffers reduce the number of disk seeks and system calls made in creating intermediate shuffle files.
Default: 32k
Must be greater than 0
and less than or equal to 2097151
((Integer.MAX_VALUE - 15) / 1024
)
Used when the following are created:
- BypassMergeSortShuffleWriter
- ShuffleExternalSorter
- UnsafeShuffleWriter
- ExternalAppendOnlyMap
- ExternalSorter
manager¶
spark.shuffle.manager
A fully-qualified class name or the alias of the ShuffleManager in a Spark application
Default: sort
Supported aliases:
sort
tungsten-sort
Used when SparkEnv
object is requested to create a "base" SparkEnv for a driver or an executor
mapOutput.parallelAggregationThreshold¶
spark.shuffle.mapOutput.parallelAggregationThreshold
(internal) Multi-thread is used when the number of mappers * shuffle partitions is greater than or equal to this threshold. Note that the actual parallelism is calculated by number of mappers * shuffle partitions / this threshold + 1, so this threshold should be positive.
Default: 10000000
Used when:
MapOutputTrackerMaster
is requested for the statistics of a ShuffleDependency
minNumPartitionsToHighlyCompress¶
spark.shuffle.minNumPartitionsToHighlyCompress
(internal) Minimum number of partitions (threshold) for MapStatus
utility to prefer a HighlyCompressedMapStatus (over CompressedMapStatus) (for ShuffleWriters).
Default: 2000
Must be a positive integer (above 0
)
push.enabled¶
spark.shuffle.push.enabled
Enables push-based shuffle on the client side
Default: false
Works in conjunction with the server side flag spark.shuffle.push.server.mergedShuffleFileManagerImpl
which needs to be set with the appropriate org.apache.spark.network.shuffle.MergedShuffleFileManager
implementation for push-based shuffle to be enabled
Used when:
Utils
utility is used to determine whether push-based shuffle is enabled or not
readHostLocalDisk¶
spark.shuffle.readHostLocalDisk
If enabled (with spark.shuffle.useOldFetchProtocol disabled and spark.shuffle.service.enabled enabled), shuffle blocks requested from those block managers which are running on the same host are read from the disk directly instead of being fetched as remote blocks over the network.
Default: true
registration.maxAttempts¶
spark.shuffle.registration.maxAttempts
How many attempts to register a BlockManager with External Shuffle Service
Default: 3
Used when BlockManager
is requested to register with External Shuffle Server
sort.bypassMergeThreshold¶
spark.shuffle.sort.bypassMergeThreshold
Maximum number of reduce partitions below which SortShuffleManager avoids merge-sorting data for no map-side aggregation
Default: 200
Used when:
SortShuffleWriter
utility is used to shouldBypassMergeSortShuffleExchangeExec
(Spark SQL) physical operator is requested toprepareShuffleDependency
spill.initialMemoryThreshold¶
spark.shuffle.spill.initialMemoryThreshold
Initial threshold for the size of an in-memory collection
Default: 5MB
Used by Spillable
spill.numElementsForceSpillThreshold¶
spark.shuffle.spill.numElementsForceSpillThreshold
(internal) The maximum number of elements in memory before forcing the shuffle sorter to spill.
Default: Integer.MAX_VALUE
The default value is to never force the sorter to spill, until Spark reaches some limitations, like the max page size limitation for the pointer array in the sorter.
Used when:
- ShuffleExternalSorter is created
- Spillable is created
- Spark SQL's
SortBasedAggregator
is requested for anUnsafeKVExternalSorter
- Spark SQL's
ObjectAggregationMap
is requested todumpToExternalSorter
- Spark SQL's
UnsafeExternalRowSorter
is created - Spark SQL's
UnsafeFixedWidthAggregationMap
is requested for anUnsafeKVExternalSorter
sync¶
spark.shuffle.sync
Controls whether DiskBlockObjectWriter
should force outstanding writes to disk while committing a single atomic block (i.e. all operating system buffers should synchronize with the disk to ensure that all changes to a file are in fact recorded in the storage)
Default: false
Used when BlockManager
is requested for a DiskBlockObjectWriter
useOldFetchProtocol¶
spark.shuffle.useOldFetchProtocol
Whether to use the old protocol while doing the shuffle block fetching. It is only enabled while we need the compatibility in the scenario of new Spark version job fetching shuffle blocks from old version external shuffle service.
Default: false
spark.speculation¶
Controls Speculative Execution of Tasks
Default: false
spark.speculation.interval¶
The time interval to use before checking for speculative tasks in Speculative Execution of Tasks.
Default: 100ms
spark.speculation.multiplier¶
Default: 1.5
spark.speculation.quantile¶
The percentage of tasks that has not finished yet at which to start speculation in Speculative Execution of Tasks.
Default: 0.75
spark.storage.blockManagerSlaveTimeoutMs¶
(in millis)
Default: spark.network.timeout
spark.storage.blockManagerTimeoutIntervalMs¶
(in millis)
Default: 60s
spark.storage.localDiskByExecutors.cacheSize¶
The max number of executors for which the local dirs are stored. This size is both applied for the driver and both for the executors side to avoid having an unbounded store. This cache will be used to avoid the network in case of fetching disk persisted RDD blocks or shuffle blocks (when spark.shuffle.readHostLocalDisk is set) from the same host.
Default: 1000
spark.storage.replication.policy¶
Default: RandomBlockReplicationPolicy
spark.storage.unrollMemoryThreshold¶
Initial memory threshold (in bytes) to unroll (materialize) a block to store in memory
Default: 1024 * 1024
Must be at most the total amount of memory available for storage
Used when:
MemoryStore
is created
spark.submit.deployMode¶
client
(default)cluster
spark.task.cpus¶
The number of CPU cores to schedule (allocate) to a task
Default: 1
Used when:
ExecutorAllocationManager
is createdTaskSchedulerImpl
is createdAppStatusListener
is requested to handle a SparkListenerEnvironmentUpdate eventSparkContext
utility is used to create a TaskSchedulerResourceProfile
is requested to getDefaultTaskResourcesLocalityPreferredContainerPlacementStrategy
is requested tonumExecutorsPending
spark.task.maxDirectResultSize¶
Maximum size of a task result (in bytes) to be sent to the driver as a DirectTaskResult
Default: 1048576B
(1L << 20
)
Used when:
TaskRunner
is requested to run a task (and decide on the type of a serialized task result)
spark.task.maxFailures¶
Number of failures of a single task (of a TaskSet) before giving up on the entire TaskSet
and then the job
Default: 4
spark.plugins¶
A comma-separated list of class names implementing org.apache.spark.api.plugin.SparkPlugin to load into a Spark application.
Default: (empty)
Since: 3.0.0
Set when SparkContext is created
spark.plugins.defaultList¶
FIXME
spark.ui.showConsoleProgress¶
Controls whether to enable ConsoleProgressBar and show the progress bar in the console
Default: false