ExternalShuffleService¶
ExternalShuffleService is a Spark service that can serve RDD and shuffle blocks.
ExternalShuffleService manages shuffle output files so they are available to executors. As the shuffle output files are managed externally to the executors it offers an uninterrupted access to the shuffle output files regardless of executors being killed or down (esp. with Dynamic Allocation of Executors).
ExternalShuffleService can be launched from command line.
ExternalShuffleService is enabled on the driver and executors using spark.shuffle.service.enabled configuration property.
Note
Spark on YARN uses a custom external shuffle service (YarnShuffleService).
Launching ExternalShuffleService¶
ExternalShuffleService can be launched as a standalone application using spark-class.
spark-class org.apache.spark.deploy.ExternalShuffleService
main Entry Point¶
main(
args: Array[String]): Unit
main is the entry point of ExternalShuffleService standalone application.
main prints out the following INFO message to the logs:
Started daemon with process name: [name]
main registers signal handlers for TERM, HUP, INT signals.
main loads the default Spark properties.
main creates a SecurityManager.
main turns spark.shuffle.service.enabled to true explicitly (since this service is started from the command line for a reason).
main creates an ExternalShuffleService and starts it.
main prints out the following DEBUG message to the logs:
Adding shutdown hook
main registers a shutdown hook. When triggered, the shutdown hook prints the following INFO message to the logs and requests the ExternalShuffleService to stop.
Shutting down shuffle service.
Creating Instance¶
ExternalShuffleService takes the following to be created:
- SparkConf
-
SecurityManager
ExternalShuffleService is created when:
ExternalShuffleServicestandalone application is startedWorker(Spark Standalone) is created (and initializes anExternalShuffleService)
TransportServer¶
server: TransportServer
ExternalShuffleService uses an internal reference to a TransportServer that is created when ExternalShuffleService is started.
ExternalShuffleService uses an ExternalBlockHandler to handle RPC messages (and serve RDD blocks and shuffle blocks).
TransportServer is requested to close when ExternalShuffleService is requested to stop.
TransportServer is used for metrics.
Port¶
ExternalShuffleService uses spark.shuffle.service.port configuration property for the port to listen to when started.
spark.shuffle.service.enabled¶
ExternalShuffleService uses spark.shuffle.service.enabled configuration property to control whether or not is enabled (and should be started when requested).
ExternalBlockHandler¶
blockHandler: ExternalBlockHandler
ExternalShuffleService creates an ExternalBlockHandler when created.
With spark.shuffle.service.db.enabled and spark.shuffle.service.enabled configuration properties enabled, the ExternalBlockHandler is given a local directory with a registeredExecutors.ldb file.
blockHandler is used to create a TransportContext that creates the TransportServer.
blockHandler is used when:
findRegisteredExecutorsDBFile¶
findRegisteredExecutorsDBFile(
dbName: String): File
findRegisteredExecutorsDBFile returns one of the local directories (defined using spark.local.dir configuration property) with the input dbName file or null when no directories defined.
findRegisteredExecutorsDBFile searches the local directories (defined using spark.local.dir configuration property) for the input dbName file. Unless found, findRegisteredExecutorsDBFile takes the first local directory.
With no local directories defined in spark.local.dir configuration property, findRegisteredExecutorsDBFile prints out the following WARN message to the logs and returns null.
'spark.local.dir' should be set first when we use db in ExternalShuffleService. Note that this only affects standalone mode.
Starting ExternalShuffleService¶
start(): Unit
start prints out the following INFO message to the logs:
Starting shuffle service on port [port] (auth enabled = [authEnabled])
start creates a AuthServerBootstrap with authentication enabled (using SecurityManager).
start creates a TransportContext (with the ExternalBlockHandler) and requests it to create a server (on the port).
start...FIXME
start is used when:
ExternalShuffleServiceis requested to startIfEnabled and is launched (as a command-line application)
startIfEnabled¶
startIfEnabled(): Unit
startIfEnabled starts the external shuffle service if enabled.
startIfEnabled is used when:
Worker(Spark Standalone) is requested tostartExternalShuffleService
Executor Removed Notification¶
executorRemoved(
executorId: String,
appId: String): Unit
executorRemoved requests the ExternalBlockHandler to executorRemoved.
executorRemoved is used when:
Worker(Spark Standalone) is requested tohandleExecutorStateChanged
Application Finished Notification¶
applicationRemoved(
appId: String): Unit
applicationRemoved requests the ExternalBlockHandler to applicationRemoved (with cleanupLocalDirs flag enabled).
applicationRemoved is used when:
Worker(Spark Standalone) is requested to handleWorkDirCleanupmessage andmaybeCleanupApplication
Logging¶
Enable ALL logging level for org.apache.spark.deploy.ExternalShuffleService logger to see what happens inside.
Add the following line to conf/log4j.properties:
log4j.logger.org.apache.spark.deploy.ExternalShuffleService=ALL
Refer to Logging.