ExternalShuffleService

ExternalShuffleService is a Spark service that can serve shuffle blocks from outside an Executor process. It runs as a standalone application and manages shuffle output files so they are available for executors at all time. As the shuffle output files are managed externally to the executors it offers an uninterrupted access to the shuffle output files regardless of executors being killed or down.

You start ExternalShuffleService using start-shuffle-service.sh shell script and enable its use by the driver and executors using spark.shuffle.service.enabled.

There is a custom external shuffle service for Spark on YARN — YarnShuffleService.

start-shuffle-service.sh Shell Script

start-shuffle-service.sh

start-shuffle-service.sh shell script allows you to launch ExternalShuffleService. The script is under sbin directory.

When executed, it runs sbin/spark-config.sh and bin/load-spark-env.sh shell scripts. It then executes sbin/spark-daemon.sh with start command and the parameters: org.apache.spark.deploy.ExternalShuffleService and 1.

$ ./sbin/start-shuffle-service.sh
starting org.apache.spark.deploy.ExternalShuffleService, logging to ...logs/spark-jacek-org.apache.spark.deploy.ExternalShuffleService-1-japila.local.out

$ tail -f ...logs/spark-jacek-org.apache.spark.deploy.ExternalShuffleService-1-japila.local.out
Spark Command: /Library/Java/JavaVirtualMachines/Current/Contents/Home/bin/java -cp /Users/jacek/dev/oss/spark/conf/:/Users/jacek/dev/oss/spark/assembly/target/scala-2.11/jars/* -Xmx1g org.apache.spark.deploy.ExternalShuffleService
========================================
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/06/07 08:02:02 INFO ExternalShuffleService: Started daemon with process name: 42918@japila.local
16/06/07 08:02:03 INFO ExternalShuffleService: Starting shuffle service on port 7337 with useSasl = false

You can also use spark-class to launch ExternalShuffleService.

spark-class org.apache.spark.deploy.ExternalShuffleService

Launching ExternalShuffleService Standalone Application

When started, it executes Utils.initDaemon(log).

FIXME Utils.initDaemon(log)? See spark-submit.

It loads default Spark properties and creates a SecurityManager.

A ExternalShuffleService is created and started.

A shutdown hook is registered so when ExternalShuffleService is shut down, it prints the following INFO message to the logs and the stop method is executed.

Shutting down shuffle service.

You should see the following INFO message in the logs:

Registered executor [AppExecId] with [executorInfo]

You should also see the following messages when a SparkContext is closed:

Application [appId] removed, cleanupLocalDirs = [cleanupLocalDirs]
Cleaning up executor [AppExecId]'s [executor.localDirs.length] local dirs
Successfully cleaned up directory: [localDir]

Creating Instance

ExternalShuffleService requires a SparkConf and SecurityManager.

When created, it reads spark.shuffle.service.enabled configuration property (disabled by default) and spark.shuffle.service.port (defaults to 7337) configuration settings. It also checks whether authentication is enabled.

FIXME Review securityManager.isAuthenticationEnabled()

ExternalShuffleService creates a TransportConf (as transportConf).

It creates a ExternalShuffleBlockHandler (as blockHandler) and TransportContext (as transportContext).

FIXME TransportContext?

No internal TransportServer (as server) is created.

Starting ExternalShuffleService

start(): Unit

start starts an ExternalShuffleService.

When start is executed, you should see the following INFO message in the logs:

Starting shuffle service on port [port] with useSasl = [useSasl]

If useSasl is enabled, a SaslServerBootstrap is created.

FIXME SaslServerBootstrap?

The internal server reference (a TransportServer) is created (which will attempt to bind to port).

Stopping ExternalShuffleService

stop(): Unit

stop closes the internal server reference and clears it (i.e. sets it to null).

Logging

Enable ALL logging level for org.apache.spark.deploy.ExternalShuffleService logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.deploy.ExternalShuffleService=ALL

Refer to Logging.