MapOutputTracker — Shuffle Map Output Registry

MapOutputTracker is a Spark service that runs on the driver and executors that tracks the shuffle map outputs (with information about the BlockManager and estimated size of the reduce blocks per shuffle).

MapOutputTracker is registered as the MapOutputTracker RPC Endpoint in the RPC Environment when SparkEnv is created.

There are two concrete MapOutputTrackers, i.e. one for the driver and another for executors:

Given the different runtime environments of the driver and executors, accessing the current MapOutputTracker is possible using SparkEnv.

SparkEnv.get.mapOutputTracker
Table 1. MapOutputTracker Internal Registries and Counters
Name Description

mapStatuses

Internal cache with MapStatus array (indexed by partition id) per shuffle id.

Used when MapOutputTracker finds map outputs for a ShuffleDependency, updates epoch and unregisters a shuffle.

epoch

Tracks the epoch in a Spark application.

Starts from 0 when MapOutputTracker is created.

Can be updated (on MapOutputTrackerWorkers) or incremented (on the driver’s MapOutputTrackerMaster).

epochLock

FIXME

MapOutputTracker is also used for mapOutputTracker.containsShuffle and MapOutputTrackerMaster.registerShuffle when a new ShuffleMapStage is created.

MapOutputTrackerMaster.getStatistics(dependency) returns MapOutputStatistics that becomes the result of JobWaiter.taskSucceeded for ShuffleMapStage if it’s the final stage in a job.

MapOutputTrackerMaster.registerMapOutputs for a shuffle id and a list of MapStatus when a ShuffleMapStage is finished.

MapOutputTracker is used in BlockStoreShuffleReader and when creating BlockManager and BlockManagerSlaveEndpoint.

trackerEndpoint Property

trackerEndpoint is a RpcEndpointRef that MapOutputTracker uses to send RPC messages.

trackerEndpoint is initialized when SparkEnv is created for the driver and executors and cleared when MapOutputTrackerMaster is stopped.

Creating MapOutputTracker Instance

FIXME

deserializeMapStatuses Method

FIXME

sendTracker Method

FIXME

serializeMapStatuses Method

FIXME

Computing Statistics for ShuffleDependency — getStatistics Method

getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics

getStatistics returns a MapOutputStatistics which is simply a pair of the shuffle id (of the input ShuffleDependency) and the total sums of estimated sizes of the reduce shuffle blocks from all the BlockManagers.

Internally, getStatistics finds map outputs for the input ShuffleDependency and calculates the total sizes for the estimated sizes of the reduce block (in bytes) for every MapStatus and partition.

The internal totalSizes array has the number of elements as specified by the number of partitions of the Partitioner of the input ShuffleDependency. totalSizes contains elements as a sum of the estimated size of the block for partition in a BlockManager (for a MapStatus).
getStatistics is used when DAGScheduler accepts a ShuffleDependency for execution (and the corresponding ShuffleMapStage has already been computed) and gets notified that a ShuffleMapTask has completed (and map-stage jobs waiting for the stage are then marked as finished).

Computing BlockManagerIds with Their Blocks and Sizes — getMapSizesByExecutorId Methods

getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int)
: Seq[(BlockManagerId, Seq[(BlockId, Long)])]

getMapSizesByExecutorId(shuffleId: Int, reduceId: Int)
: Seq[(BlockManagerId, Seq[(BlockId, Long)])] (1)
1 Calls the other getMapSizesByExecutorId with endPartition as reduceId + 1 and is used exclusively in tests.
FIXME How do the start and end partitions influence the return value?

getMapSizesByExecutorId returns a collection of BlockManagerIds with their blocks and sizes.

When executed, you should see the following DEBUG message in the logs:

DEBUG Fetching outputs for shuffle [id], partitions [startPartition]-[endPartition]

getMapSizesByExecutorId finds map outputs for the input shuffleId.

getMapSizesByExecutorId gets the map outputs for all the partitions (despite the method’s signature).

In the end, getMapSizesByExecutorId converts shuffle map outputs (as MapStatuses) into the collection of BlockManagerIds with their blocks and sizes.

getMapSizesByExecutorId is exclusively used when BlockStoreShuffleReader reads combined records for a reduce task.

Returning Current Epoch — getEpoch Method

getEpoch: Long

getEpoch returns the current epoch.

getEpoch is used when DAGScheduler is notified that an executor was lost and when TaskSetManager is created (and sets the epoch for the tasks in a TaskSet).

Updating Epoch — updateEpoch Method

updateEpoch(newEpoch: Long): Unit

updateEpoch updates epoch when the input newEpoch is greater (and hence more recent) and clears the mapStatuses internal cache.

You should see the following INFO message in the logs:

INFO MapOutputTrackerWorker: Updating epoch to [newEpoch] and clearing cache
updateEpoch is exclusively used when TaskRunner runs (for a task).

Unregistering Shuffle — unregisterShuffle Method

unregisterShuffle(shuffleId: Int): Unit

unregisterShuffle unregisters shuffleId, i.e. removes shuffleId entry from the mapStatuses internal cache.

unregisterShuffle is used when ContextCleaner removes a shuffle (blocks) from MapOutputTrackerMaster and BlockManagerMaster (aka shuffle cleanup) and when BlockManagerSlaveEndpoint handles RemoveShuffle message.

stop Method

stop(): Unit

stop does nothing at all.

stop is used exclusively when SparkEnv stops (and stops all the services, MapOutputTracker including).
stop is overriden by MapOutputTrackerMaster.

Finding Map Outputs For ShuffleDependency in Cache or Fetching Remotely — getStatuses Internal Method

getStatuses(shuffleId: Int): Array[MapStatus]

getStatuses finds MapStatuses for the input shuffleId in the mapStatuses internal cache and, when not available, fetches them from a remote MapOutputTrackerMaster (using RPC).

Internally, getStatuses first queries the mapStatuses internal cache and returns the map outputs if found.

If not found (in the mapStatuses internal cache), you should see the following INFO message in the logs:

INFO Don't have map outputs for shuffle [id], fetching them

If some other process fetches the map outputs for the shuffleId (as recorded in fetching internal registry), getStatuses waits until it is done.

When no other process fetches the map outputs, getStatuses registers the input shuffleId in fetching internal registry (of shuffle map outputs being fetched).

You should see the following INFO message in the logs:

INFO Doing the fetch; tracker endpoint = [trackerEndpoint]

getStatuses sends a GetMapOutputStatuses RPC remote message for the input shuffleId to the trackerEndpoint expecting a Array[Byte].

getStatuses requests shuffle map outputs remotely within a timeout and with retries. Refer to RpcEndpointRef.

getStatuses deserializes the map output statuses and records the result in the mapStatuses internal cache.

You should see the following INFO message in the logs:

INFO Got the output locations

getStatuses removes the input shuffleId from fetching internal registry.

You should see the following DEBUG message in the logs:

DEBUG Fetching map output statuses for shuffle [id] took [time] ms

If getStatuses could not find the map output locations for the input shuffleId (locally and remotely), you should see the following ERROR message in the logs and throws a MetadataFetchFailedException.

ERROR Missing all output locations for shuffle [id]
getStatuses is used when MapOutputTracker getMapSizesByExecutorId and computes statistics for ShuffleDependency.

Converting MapStatuses To BlockManagerIds with ShuffleBlockIds and Their Sizes — convertMapStatuses Object Method

convertMapStatuses(
  shuffleId: Int,
  startPartition: Int,
  endPartition: Int,
  statuses: Array[MapStatus]): Seq[(BlockManagerId, Seq[(BlockId, Long)])]

convertMapStatuses iterates over the input statuses array (of MapStatus entries indexed by map id) and creates a collection of BlockManagerId (for each MapStatus entry) with a ShuffleBlockId (with the input shuffleId, a mapId, and partition ranging from the input startPartition and endPartition) and estimated size for the reduce block for every status and partitions.

For any empty MapStatus, you should see the following ERROR message in the logs:

ERROR Missing an output location for shuffle [id]

And convertMapStatuses throws a MetadataFetchFailedException (with shuffleId, startPartition, and the above error message).

Sending Blocking Messages To trackerEndpoint RpcEndpointRef — askTracker Method

askTracker[T](message: Any): T

askTracker sends the message to trackerEndpoint RpcEndpointRef and waits for a result.

When an exception happens, you should see the following ERROR message in the logs and askTracker throws a SparkException.

ERROR Error communicating with MapOutputTracker
askTracker is used when MapOutputTracker fetches map outputs for ShuffleDependency remotely and sends a one-way message.