Skip to content

MapOutputTrackerMaster

MapOutputTrackerMaster is a MapOutputTracker for the driver.

MapOutputTrackerMaster is the source of truth of shuffle map output locations.

Creating Instance

MapOutputTrackerMaster takes the following to be created:

When created, MapOutputTrackerMaster starts dispatcher threads on the map-output-dispatcher thread pool.

MapOutputTrackerMaster is created when:

maxRpcMessageSize

maxRpcMessageSize is...FIXME

BroadcastManager

MapOutputTrackerMaster is given a BroadcastManager to be created.

Shuffle Map Output Status Registry

MapOutputTrackerMaster uses an internal registry of ShuffleStatuses by shuffle stages.

MapOutputTrackerMaster adds a new shuffle when requested to register one (when DAGScheduler is requested to create a ShuffleMapStage for a ShuffleDependency).

MapOutputTrackerMaster uses the registry when requested for the following:

MapOutputTrackerMaster removes (clears) all shuffles when requested to stop.

Configuration Properties

MapOutputTrackerMaster uses the following configuration properties:

Map and Reduce Task Thresholds for Preferred Locations

MapOutputTrackerMaster defines 1000 (tasks) as the hardcoded threshold of the number of map and reduce tasks when requested to compute preferred locations with spark.shuffle.reduceLocality.enabled.

Map Output Threshold for Preferred Location of Reduce Tasks

MapOutputTrackerMaster defines 0.2 as the fraction of total map output that must be at a location for it to considered as a preferred location for a reduce task.

Making this larger will focus on fewer locations where most data can be read locally, but may lead to more delay in scheduling if those locations are busy.

MapOutputTrackerMaster uses the fraction when requested for the preferred locations of shuffle RDDs.

GetMapOutputMessage Queue

MapOutputTrackerMaster uses a blocking queue (a Java LinkedBlockingQueue) for requests for map output statuses.

GetMapOutputMessage(
  shuffleId: Int,
  context: RpcCallContext)

GetMapOutputMessage holds the shuffle ID and the RpcCallContext of the caller.

A new GetMapOutputMessage is added to the queue when MapOutputTrackerMaster is requested to post one.

MapOutputTrackerMaster uses MessageLoop Dispatcher Threads to process GetMapOutputMessages.

MessageLoop Dispatcher Thread

MessageLoop is a thread of execution to handle GetMapOutputMessages until a PoisonPill marker message arrives (when MapOutputTrackerMaster is requested to stop).

MessageLoop takes a GetMapOutputMessage and prints out the following DEBUG message to the logs:

Handling request to send map output locations for shuffle [shuffleId] to [hostPort]

MessageLoop then finds the ShuffleStatus by the shuffle ID in the shuffleStatuses internal registry and replies back (to the RPC client) with a serialized map output status (with the BroadcastManager and spark.shuffle.mapOutput.minSizeForBroadcast configuration property).

MessageLoop threads run on the map-output-dispatcher Thread Pool.

map-output-dispatcher Thread Pool

threadpool: ThreadPoolExecutor

threadpool is a daemon fixed thread pool registered with map-output-dispatcher thread name prefix.

threadpool uses spark.shuffle.mapOutput.dispatcher.numThreads configuration property for the number of MessageLoop dispatcher threads to process received GetMapOutputMessage messages.

The dispatcher threads are started immediately when MapOutputTrackerMaster is created.

The thread pool is shut down when MapOutputTrackerMaster is requested to stop.

Epoch Number

MapOutputTrackerMaster uses an epoch number to...FIXME

getEpoch is used when:

Enqueueing GetMapOutputMessage

post(
  message: GetMapOutputMessage): Unit

post simply adds the input GetMapOutputMessage to the mapOutputRequests internal queue.

post is used when MapOutputTrackerMasterEndpoint is requested to handle a GetMapOutputStatuses message.

Stopping MapOutputTrackerMaster

stop(): Unit

stop...FIXME

stop is part of the MapOutputTracker abstraction.

Unregistering Shuffle Map Output

unregisterMapOutput(
  shuffleId: Int,
  mapId: Int,
  bmAddress: BlockManagerId): Unit

unregisterMapOutput...FIXME

unregisterMapOutput is used when DAGScheduler is requested to handle a task completion (due to a fetch failure).

Computing Preferred Locations

getPreferredLocationsForShuffle(
  dep: ShuffleDependency[_, _, _],
  partitionId: Int): Seq[String]

getPreferredLocationsForShuffle computes the locations (BlockManagers) with the most shuffle map outputs for the input ShuffleDependency and Partition.

getPreferredLocationsForShuffle computes the locations when all of the following are met:

Note

getPreferredLocationsForShuffle is simply getLocationsWithLargestOutputs with a guard condition.

Internally, getPreferredLocationsForShuffle checks whether spark.shuffle.reduceLocality.enabled configuration property is enabled with the number of partitions of the RDD of the input ShuffleDependency and partitions in the partitioner of the input ShuffleDependency both being less than 1000.

Note

The thresholds for the number of partitions in the RDD and of the partitioner when computing the preferred locations are 1000 and are not configurable.

If the condition holds, getPreferredLocationsForShuffle finds locations with the largest number of shuffle map outputs for the input ShuffleDependency and partitionId (with the number of partitions in the partitioner of the input ShuffleDependency and 0.2) and returns the hosts of the preferred BlockManagers.

Note

0.2 is the fraction of total map output that must be at a location to be considered as a preferred location for a reduce task. It is not configurable.

getPreferredLocationsForShuffle is used when ShuffledRDD and Spark SQL's ShuffledRowRDD are requested for preferred locations of a partition.

Finding Locations with Largest Number of Shuffle Map Outputs

getLocationsWithLargestOutputs(
  shuffleId: Int,
  reducerId: Int,
  numReducers: Int,
  fractionThreshold: Double): Option[Array[BlockManagerId]]

getLocationsWithLargestOutputs returns BlockManagerIds with the largest size (of all the shuffle blocks they manage) above the input fractionThreshold (given the total size of all the shuffle blocks for the shuffle across all BlockManagers).

Note

getLocationsWithLargestOutputs may return no BlockManagerId if their shuffle blocks do not total up above the input fractionThreshold.

Note

The input numReducers is not used.

Internally, getLocationsWithLargestOutputs queries the mapStatuses internal cache for the input shuffleId.

Note

One entry in mapStatuses internal cache is a MapStatus array indexed by partition id.

MapStatus includes information about the BlockManager (as BlockManagerId) and estimated size of the reduce blocks.

getLocationsWithLargestOutputs iterates over the MapStatus array and builds an interim mapping between BlockManagerId and the cumulative sum of shuffle blocks across BlockManagers.

Incrementing Epoch

incrementEpoch(): Unit

incrementEpoch increments the internal epoch.

incrementEpoch prints out the following DEBUG message to the logs:

Increasing epoch to [epoch]

incrementEpoch is used when:

Checking Availability of Shuffle Map Output Status

containsShuffle(
  shuffleId: Int): Boolean

containsShuffle checks if the input shuffleId is registered in the cachedSerializedStatuses or mapStatuses internal caches.

containsShuffle is used when DAGScheduler is requested to create a createShuffleMapStage (for a ShuffleDependency).

Registering Shuffle

registerShuffle(
  shuffleId: Int,
  numMaps: Int): Unit

registerShuffle registers a new ShuffleStatus (for the given shuffle ID and the number of partitions) to the shuffleStatuses internal registry.

registerShuffle throws an IllegalArgumentException when the shuffle ID has already been registered:

Shuffle ID [shuffleId] registered twice

registerShuffle is used when:

Registering Map Outputs for Shuffle (Possibly with Epoch Change)

registerMapOutputs(
  shuffleId: Int,
  statuses: Array[MapStatus],
  changeEpoch: Boolean = false): Unit

registerMapOutputs registers the input statuses (as the shuffle map output) with the input shuffleId in the mapStatuses internal cache.

registerMapOutputs increments epoch if the input changeEpoch is enabled (it is not by default).

registerMapOutputs is used when DAGScheduler handles successful ShuffleMapTask completion and executor lost events.

Finding Serialized Map Output Statuses (And Possibly Broadcasting Them)

getSerializedMapOutputStatuses(
  shuffleId: Int): Array[Byte]

getSerializedMapOutputStatuses finds cached serialized map statuses for the input shuffleId.

If found, getSerializedMapOutputStatuses returns the cached serialized map statuses.

Otherwise, getSerializedMapOutputStatuses acquires the shuffle lock for shuffleId and finds cached serialized map statuses again since some other thread could not update the cachedSerializedStatuses internal cache.

getSerializedMapOutputStatuses returns the serialized map statuses if found.

If not, getSerializedMapOutputStatuses serializes the local array of MapStatuses (from checkCachedStatuses).

getSerializedMapOutputStatuses prints out the following INFO message to the logs:

Size of output statuses for shuffle [shuffleId] is [bytes] bytes

getSerializedMapOutputStatuses saves the serialized map output statuses in cachedSerializedStatuses internal cache if the epoch has not changed in the meantime. getSerializedMapOutputStatuses also saves its broadcast version in cachedSerializedBroadcast internal cache.

If the epoch has changed in the meantime, the serialized map output statuses and their broadcast version are not saved, and getSerializedMapOutputStatuses prints out the following INFO message to the logs:

Epoch changed, not caching!

getSerializedMapOutputStatuses removes the broadcast.

getSerializedMapOutputStatuses returns the serialized map statuses.

getSerializedMapOutputStatuses is used when MapOutputTrackerMaster responds to GetMapOutputMessage requests and DAGScheduler creates ShuffleMapStage for ShuffleDependency (copying the shuffle map output locations from previous jobs to avoid unnecessarily regenerating data).

Finding Cached Serialized Map Statuses

checkCachedStatuses(): Boolean

checkCachedStatuses is an internal helper method that <> uses to do some bookkeeping (when the <> and <> differ) and set local statuses, retBytes and epochGotten (that getSerializedMapOutputStatuses uses).

Internally, checkCachedStatuses acquires the MapOutputTracker.md#epochLock[epochLock lock] and checks the status of <> to <cacheEpoch>>.

If epoch is younger (i.e. greater), checkCachedStatuses clears <> internal cache, <> and sets cacheEpoch to be epoch.

checkCachedStatuses gets the serialized map output statuses for the shuffleId (of the owning <>).

When the serialized map output status is found, checkCachedStatuses saves it in a local retBytes and returns true.

When not found, you should see the following DEBUG message in the logs:

cached status not found for : [shuffleId]

checkCachedStatuses uses MapOutputTracker.md#mapStatuses[mapStatuses] internal cache to get map output statuses for the shuffleId (of the owning <>) or falls back to an empty array and sets it to a local statuses. checkCachedStatuses sets the local epochGotten to the current <> and returns false.

Registering Shuffle Map Output

registerMapOutput(
  shuffleId: Int,
  mapId: Int,
  status: MapStatus): Unit

registerMapOutput finds the ShuffleStatus by the given shuffle ID and adds the given MapStatus:

registerMapOutput is used when DAGScheduler is requested to handle a ShuffleMapTask completion.

Map Output Statistics for ShuffleDependency

getStatistics(
  dep: ShuffleDependency[_, _, _]): MapOutputStatistics

getStatistics requests the input ShuffleDependency for the shuffle ID and looks up the corresponding ShuffleStatus (in the shuffleStatuses registry).

getStatistics assumes that the ShuffleStatus is in shuffleStatuses registry.

getStatistics requests the ShuffleStatus for the MapStatuses (of the ShuffleDependency).

getStatistics uses the spark.shuffle.mapOutput.parallelAggregationThreshold configuration property to decide on parallelism to calculate the statistics.

With no parallelism, getStatistics simply traverses over the MapStatuses and requests them (one by one) for the size of every shuffle block.

Note

getStatistics requests the given ShuffleDependency for the Partitioner that in turn is requested for the number of partitions.

The number of blocks is the number of MapStatuses multiplied by the number of partitions.

And hence the need for parallelism based on the spark.shuffle.mapOutput.parallelAggregationThreshold configuration property.

In the end, getStatistics creates a MapOutputStatistics with the shuffle ID (of the given ShuffleDependency) and the total sizes (sumed up for every partition).

getStatistics is used when:

Deregistering All Map Outputs of Shuffle Stage

unregisterAllMapOutput(
  shuffleId: Int): Unit

unregisterAllMapOutput...FIXME

unregisterAllMapOutput is used when DAGScheduler is requested to handle a task completion (due to a fetch failure).

Deregistering Shuffle

unregisterShuffle(
  shuffleId: Int): Unit

unregisterShuffle...FIXME

unregisterShuffle is part of the MapOutputTracker abstraction.

Deregistering Shuffle Outputs Associated with Host

removeOutputsOnHost(
  host: String): Unit

removeOutputsOnHost...FIXME

removeOutputsOnHost is used when DAGScheduler is requested to removeExecutorAndUnregisterOutputs and handle a worker removal.

Deregistering Shuffle Outputs Associated with Executor

removeOutputsOnExecutor(
  execId: String): Unit

removeOutputsOnExecutor...FIXME

removeOutputsOnExecutor is used when DAGScheduler is requested to removeExecutorAndUnregisterOutputs.

Number of Partitions with Shuffle Map Outputs Available

getNumAvailableOutputs(
  shuffleId: Int): Int

getNumAvailableOutputs...FIXME

getNumAvailableOutputs is used when ShuffleMapStage is requested for the number of partitions with shuffle outputs available.

Finding Missing Partitions

findMissingPartitions(
  shuffleId: Int): Option[Seq[Int]]

findMissingPartitions...FIXME

findMissingPartitions is used when ShuffleMapStage is requested for missing partitions.

Finding Locations with Blocks and Sizes

getMapSizesByExecutorId(
  shuffleId: Int,
  startPartition: Int,
  endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long)])]

getMapSizesByExecutorId is part of the MapOutputTracker abstraction.

getMapSizesByExecutorId returns a collection of BlockManagerIds with their blocks and sizes.

When executed, getMapSizesByExecutorId prints out the following DEBUG message to the logs:

Fetching outputs for shuffle [id], partitions [startPartition]-[endPartition]

getMapSizesByExecutorId finds map outputs for the input shuffleId.

Note

getMapSizesByExecutorId gets the map outputs for all the partitions (despite the method's signature).

In the end, getMapSizesByExecutorId converts shuffle map outputs (as MapStatuses) into the collection of BlockManagerIds with their blocks and sizes.

Logging

Enable ALL logging level for org.apache.spark.MapOutputTrackerMaster logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.MapOutputTrackerMaster=ALL

Refer to Logging.