Skip to content

ExternalShuffleService

ExternalShuffleService is a Spark service that can serve RDD and shuffle blocks.

ExternalShuffleService manages shuffle output files so they are available to executors. As the shuffle output files are managed externally to the executors it offers an uninterrupted access to the shuffle output files regardless of executors being killed or down (esp. with Dynamic Allocation of Executors).

ExternalShuffleService can be launched from command line.

ExternalShuffleService is enabled on the driver and executors using spark.shuffle.service.enabled configuration property.

Note

Spark on YARN uses a custom external shuffle service (YarnShuffleService).

Launching ExternalShuffleService

ExternalShuffleService can be launched as a standalone application using spark-class.

spark-class org.apache.spark.deploy.ExternalShuffleService

main Entry Point

main(
  args: Array[String]): Unit

main is the entry point of ExternalShuffleService standalone application.

main prints out the following INFO message to the logs:

Started daemon with process name: [name]

main registers signal handlers for TERM, HUP, INT signals.

main loads the default Spark properties.

main creates a SecurityManager.

main turns spark.shuffle.service.enabled to true explicitly (since this service is started from the command line for a reason).

main creates an ExternalShuffleService and starts it.

main prints out the following DEBUG message to the logs:

Adding shutdown hook

main registers a shutdown hook. When triggered, the shutdown hook prints the following INFO message to the logs and requests the ExternalShuffleService to stop.

Shutting down shuffle service.

Creating Instance

ExternalShuffleService takes the following to be created:

ExternalShuffleService is created when:

  • ExternalShuffleService standalone application is started
  • Worker (Spark Standalone) is created (and initializes an ExternalShuffleService)

TransportServer

server: TransportServer

ExternalShuffleService uses an internal reference to a TransportServer that is created when ExternalShuffleService is started.

ExternalShuffleService uses an ExternalBlockHandler to handle RPC messages (and serve RDD blocks and shuffle blocks).

TransportServer is requested to close when ExternalShuffleService is requested to stop.

TransportServer is used for metrics.

Port

ExternalShuffleService uses spark.shuffle.service.port configuration property for the port to listen to when started.

spark.shuffle.service.enabled

ExternalShuffleService uses spark.shuffle.service.enabled configuration property to control whether or not is enabled (and should be started when requested).

ExternalBlockHandler

blockHandler: ExternalBlockHandler

ExternalShuffleService creates an ExternalBlockHandler when created.

With spark.shuffle.service.db.enabled and spark.shuffle.service.enabled configuration properties enabled, the ExternalBlockHandler is given a local directory with a registeredExecutors.ldb file.

blockHandler is used to create a TransportContext that creates the TransportServer.

blockHandler is used when:

findRegisteredExecutorsDBFile

findRegisteredExecutorsDBFile(
  dbName: String): File

findRegisteredExecutorsDBFile returns one of the local directories (defined using spark.local.dir configuration property) with the input dbName file or null when no directories defined.

findRegisteredExecutorsDBFile searches the local directories (defined using spark.local.dir configuration property) for the input dbName file. Unless found, findRegisteredExecutorsDBFile takes the first local directory.

With no local directories defined in spark.local.dir configuration property, findRegisteredExecutorsDBFile prints out the following WARN message to the logs and returns null.

'spark.local.dir' should be set first when we use db in ExternalShuffleService. Note that this only affects standalone mode.

Starting ExternalShuffleService

start(): Unit

start prints out the following INFO message to the logs:

Starting shuffle service on port [port] (auth enabled = [authEnabled])

start creates a AuthServerBootstrap with authentication enabled (using SecurityManager).

start creates a TransportContext (with the ExternalBlockHandler) and requests it to create a server (on the port).

start...FIXME

start is used when:

startIfEnabled

startIfEnabled(): Unit

startIfEnabled starts the external shuffle service if enabled.

startIfEnabled is used when:

  • Worker (Spark Standalone) is requested to startExternalShuffleService

Executor Removed Notification

executorRemoved(
  executorId: String,
  appId: String): Unit

executorRemoved requests the ExternalBlockHandler to executorRemoved.

executorRemoved is used when:

Application Finished Notification

applicationRemoved(
  appId: String): Unit

applicationRemoved requests the ExternalBlockHandler to applicationRemoved (with cleanupLocalDirs flag enabled).

applicationRemoved is used when:

  • Worker (Spark Standalone) is requested to handle WorkDirCleanup message and maybeCleanupApplication

Logging

Enable ALL logging level for org.apache.spark.deploy.ExternalShuffleService logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.deploy.ExternalShuffleService=ALL

Refer to Logging.