MapOutputTrackerMaster

MapOutputTrackerMaster is the MapOutputTracker for the driver.

A MapOutputTrackerMaster is the source of truth of shuffle map output locations.

Creating Instance

MapOutputTrackerMaster takes the following to be created:

MapOutputTrackerMaster starts dispatcher threads on the map-output-dispatcher thread pool.

MapOutputTrackerMaster and BroadcastManager

MapOutputTrackerMaster is given a BroadcastManager to be created.

Shuffle Map Output Status Registry

MapOutputTrackerMaster uses an internal registry of ShuffleStatuses by shuffle stages.

MapOutputTrackerMaster adds a new shuffle when requested to register one (when DAGScheduler is requested to create a ShuffleMapStage for a ShuffleDependency).

MapOutputTrackerMaster uses the registry when requested for the following:

MapOutputTrackerMaster removes (clears) all shuffles when requested to stop.

Configuration Properties

Map and Reduce Task Thresholds for Preferred Locations

MapOutputTrackerMaster defines 1000 (tasks) as the hardcoded threshold of the number of map and reduce tasks when requested to compute preferred locations with spark.shuffle.reduceLocality.enabled.

Map Output Threshold for Preferred Location of Reduce Tasks

MapOutputTrackerMaster defines 0.2 as the fraction of total map output that must be at a location for it to considered as a preferred location for a reduce task.

Making this larger will focus on fewer locations where most data can be read locally, but may lead to more delay in scheduling if those locations are busy.

MapOutputTrackerMaster uses the fraction when requested for the preferred locations of shuffle RDDs.

GetMapOutputMessage Queue

MapOutputTrackerMaster uses a blocking queue (a Java LinkedBlockingQueue) for requests for map output statuses.

GetMapOutputMessage(shuffleId: Int, context: RpcCallContext)

GetMapOutputMessage holds the shuffle ID and the RpcCallContext of the caller.

A new GetMapOutputMessage is added to the queue when MapOutputTrackerMaster is requested to post one.

MapOutputTrackerMaster uses MessageLoop Dispatcher Threads to process GetMapOutputMessages.

MessageLoop Dispatcher Thread

MessageLoop is a thread of execution to handle GetMapOutputMessages until a PoisonPill marker message arrives (posted when MapOutputTrackerMaster stops).

MessageLoop takes a GetMapOutputMessage and prints out the following DEBUG message to the logs:

Handling request to send map output locations for shuffle [shuffleId] to [hostPort]

MessageLoop then finds the ShuffleStatus by the shuffle ID in the shuffleStatuses internal registry and replies back (to the RPC client) with a serialized map output status (with the BroadcastManager and spark.shuffle.mapOutput.minSizeForBroadcast configuration property).

MessageLoop threads run on the map-output-dispatcher Thread Pool.

map-output-dispatcher Thread Pool

threadpool: ThreadPoolExecutor

threadpool is a daemon fixed thread pool registered with map-output-dispatcher thread name prefix.

threadpool uses spark.shuffle.mapOutput.dispatcher.numThreads configuration property for the number of MessageLoop dispatcher threads to process received GetMapOutputMessage messages.

The dispatcher threads are started immediately when MapOutputTrackerMaster is created.

The thread pool is shut down when MapOutputTrackerMaster is requested to stop.

Epoch Number

MapOutputTrackerMaster uses an epoch number to…​FIXME

getEpoch is used when:

Enqueueing GetMapOutputMessage

post(
  message: GetMapOutputMessage): Unit

post simply adds the input GetMapOutputMessage to the mapOutputRequests internal queue.

post is used when MapOutputTrackerMasterEndpoint is requested to handle a GetMapOutputStatuses message.

Stopping MapOutputTrackerMaster

stop(): Unit

stop…​FIXME

stop is part of the MapOutputTracker abstraction.

Unregistering Shuffle Map Output

unregisterMapOutput(
  shuffleId: Int,
  mapId: Int,
  bmAddress: BlockManagerId): Unit

unregisterMapOutput…​FIXME

unregisterMapOutput is used when DAGScheduler is requested to handle a task completion (due to a fetch failure).

Computing Preferred Locations (with Most Shuffle Map Outputs)

getPreferredLocationsForShuffle(
  dep: ShuffleDependency[_, _, _],
  partitionId: Int): Seq[String]

getPreferredLocationsForShuffle computes the locations (BlockManagers) with the most shuffle map outputs for the input ShuffleDependency and Partition.

getPreferredLocationsForShuffle computes the locations when all of the following are met:

getPreferredLocationsForShuffle is simply getLocationsWithLargestOutputs with a guard condition.

Internally, getPreferredLocationsForShuffle checks whether spark.shuffle.reduceLocality.enabled Spark property is enabled (it is by default) with the number of partitions of the RDD of the input ShuffleDependency and partitions in the partitioner of the input ShuffleDependency both being less than 1000.

The thresholds for the number of partitions in the RDD and of the partitioner when computing the preferred locations are 1000 and are not configurable.

If the condition holds, getPreferredLocationsForShuffle finds locations with the largest number of shuffle map outputs for the input ShuffleDependency and partitionId (with the number of partitions in the partitioner of the input ShuffleDependency and 0.2) and returns the hosts of the preferred BlockManagers.

0.2 is the fraction of total map output that must be at a location to be considered as a preferred location for a reduce task. It is not configurable.

getPreferredLocationsForShuffle is used when ShuffledRDD and Spark SQL’s ShuffledRowRDD are requested for preferred locations of a partition.

Incrementing Epoch

incrementEpoch(): Unit

incrementEpoch increments the internal epoch.

incrementEpoch prints out the following DEBUG message to the logs:

Increasing epoch to [epoch]

incrementEpoch is used when:

Checking Availability of Shuffle Map Output Status

containsShuffle(
  shuffleId: Int): Boolean

containsShuffle checks if the input shuffleId is registered in the cachedSerializedStatuses or mapStatuses internal caches.

containsShuffle is used when DAGScheduler is requested to create a createShuffleMapStage (for a ShuffleDependency).

Registering Shuffle

registerShuffle(
  shuffleId: Int,
  numMaps: Int): Unit

registerShuffle adds the input shuffle ID and the number of partitions (as a ShuffleStatus) to shuffleStatuses internal registry.

If the shuffle ID has already been registered, registerShuffle throws an IllegalArgumentException:

Shuffle ID [shuffleId] registered twice

registerShuffle is used when DAGScheduler is requested to create a ShuffleMapStage (for a ShuffleDependency).

Registering Map Outputs for Shuffle (Possibly with Epoch Change)

registerMapOutputs(
  shuffleId: Int,
  statuses: Array[MapStatus],
  changeEpoch: Boolean = false): Unit

registerMapOutputs registers the input statuses (as the shuffle map output) with the input shuffleId in the mapStatuses internal cache.

registerMapOutputs increments epoch if the input changeEpoch is enabled (it is not by default).

registerMapOutputs is used when DAGScheduler handles successful ShuffleMapTask completion and executor lost events.

Finding Serialized Map Output Statuses (And Possibly Broadcasting Them)

getSerializedMapOutputStatuses(
  shuffleId: Int): Array[Byte]

getSerializedMapOutputStatuses finds cached serialized map statuses for the input shuffleId.

If found, getSerializedMapOutputStatuses returns the cached serialized map statuses.

Otherwise, getSerializedMapOutputStatuses acquires the shuffle lock for shuffleId and finds cached serialized map statuses again since some other thread could not update the cachedSerializedStatuses internal cache.

getSerializedMapOutputStatuses returns the serialized map statuses if found.

If not, getSerializedMapOutputStatuses serializes the local array of MapStatuses (from checkCachedStatuses).

You should see the following INFO message in the logs:

Size of output statuses for shuffle [shuffleId] is [bytes] bytes

getSerializedMapOutputStatuses saves the serialized map output statuses in cachedSerializedStatuses internal cache if the epoch has not changed in the meantime. getSerializedMapOutputStatuses also saves its broadcast version in cachedSerializedBroadcast internal cache.

If the epoch has changed in the meantime, the serialized map output statuses and their broadcast version are not saved, and you should see the following INFO message in the logs:

Epoch changed, not caching!

getSerializedMapOutputStatuses removes the broadcast.

getSerializedMapOutputStatuses returns the serialized map statuses.

getSerializedMapOutputStatuses is used when MapOutputTrackerMaster responds to GetMapOutputMessage requests and DAGScheduler creates ShuffleMapStage for ShuffleDependency (copying the shuffle map output locations from previous jobs to avoid unnecessarily regenerating data).

Finding Cached Serialized Map Statuses

checkCachedStatuses(): Boolean

checkCachedStatuses is an internal helper method that getSerializedMapOutputStatuses uses to do some bookkeeping (when the epoch and cacheEpoch differ) and set local statuses, retBytes and epochGotten (that getSerializedMapOutputStatuses uses).

Internally, checkCachedStatuses acquires the epochLock lock and checks the status of epoch to cached cacheEpoch.

If epoch is younger (i.e. greater), checkCachedStatuses clears cachedSerializedStatuses internal cache, cached broadcasts and sets cacheEpoch to be epoch.

checkCachedStatuses gets the serialized map output statuses for the shuffleId (of the owning getSerializedMapOutputStatuses).

When the serialized map output status is found, checkCachedStatuses saves it in a local retBytes and returns true.

When not found, you should see the following DEBUG message in the logs:

cached status not found for : [shuffleId]

checkCachedStatuses uses mapStatuses internal cache to get map output statuses for the shuffleId (of the owning getSerializedMapOutputStatuses) or falls back to an empty array and sets it to a local statuses. checkCachedStatuses sets the local epochGotten to the current epoch and returns false.

Registering Shuffle Map Output

registerMapOutput(
  shuffleId: Int,
  mapId: Int,
  status: MapStatus): Unit

registerMapOutput finds the ShuffleStatus by the given shuffle ID and adds the given MapStatus:

registerMapOutput is used when DAGScheduler is requested to handle a ShuffleMapTask completion.

Calculating Shuffle Map Output Statistics

getStatistics(
  dep: ShuffleDependency[_, _, _]): MapOutputStatistics

getStatistics…​FIXME

getStatistics is used when DAGScheduler is requested to handle a ShuffleMapStage submission (and the stage has finished) and markMapStageJobsAsFinished.

Deregistering All Map Outputs of Shuffle Stage

unregisterAllMapOutput(
  shuffleId: Int): Unit

unregisterAllMapOutput…​FIXME

unregisterAllMapOutput is used when DAGScheduler is requested to handle a task completion (due to a fetch failure).

Deregistering Shuffle

unregisterShuffle(
  shuffleId: Int): Unit

unregisterShuffle…​FIXME

unregisterShuffle is part of the MapOutputTracker abstraction.

Deregistering Shuffle Outputs Associated with Host

removeOutputsOnHost(
  host: String): Unit

removeOutputsOnHost…​FIXME

removeOutputsOnHost is used when DAGScheduler is requested to removeExecutorAndUnregisterOutputs and handle a worker removal.

Deregistering Shuffle Outputs Associated with Executor

removeOutputsOnExecutor(
  execId: String): Unit

removeOutputsOnExecutor…​FIXME

removeOutputsOnExecutor is used when DAGScheduler is requested to removeExecutorAndUnregisterOutputs.

Number of Partitions with Shuffle Map Outputs Available

getNumAvailableOutputs(
  shuffleId: Int): Int

getNumAvailableOutputs…​FIXME

getNumAvailableOutputs is used when ShuffleMapStage is requested for the number of partitions with shuffle outputs available.

Finding Missing Partitions

findMissingPartitions(
  shuffleId: Int): Option[Seq[Int]]

findMissingPartitions…​FIXME

findMissingPartitions is used when ShuffleMapStage is requested for missing partitions.

Finding Locations with Blocks and Sizes

getMapSizesByExecutorId(
  shuffleId: Int,
  startPartition: Int,
  endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long)])]

getMapSizesByExecutorId…​FIXME

getMapSizesByExecutorId is part of the MapOutputTracker abstraction.

Finding Locations with Largest Number of Shuffle Map Outputs

getLocationsWithLargestOutputs(
  shuffleId: Int,
  reducerId: Int,
  numReducers: Int,
  fractionThreshold: Double): Option[Array[BlockManagerId]]

getLocationsWithLargestOutputs returns BlockManagerIds with the largest size (of all the shuffle blocks they manage) above the input fractionThreshold (given the total size of all the shuffle blocks for the shuffle across all BlockManagers).

getLocationsWithLargestOutputs may return no BlockManagerId if their shuffle blocks do not total up above the input fractionThreshold.
The input numReducers is not used.

Internally, getLocationsWithLargestOutputs queries the mapStatuses internal cache for the input shuffleId.

One entry in mapStatuses internal cache is a MapStatus array indexed by partition id.

getLocationsWithLargestOutputs iterates over the MapStatus array and builds an interim mapping between BlockManagerId and the cumulative sum of shuffle blocks across BlockManagers.

getLocationsWithLargestOutputs is used when MapOutputTrackerMaster is requested for the preferred locations of a shuffle.

Logging

Enable ALL logging level for org.apache.spark.MapOutputTrackerMaster logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.MapOutputTrackerMaster=ALL

Refer to Logging.