MapOutputTrackerMaster¶
MapOutputTrackerMaster is a MapOutputTracker for the driver.
MapOutputTrackerMaster is the source of truth of shuffle map output locations.
Creating Instance¶
MapOutputTrackerMaster takes the following to be created:
- SparkConf
- BroadcastManager
-
isLocalflag (to indicate whetherMapOutputTrackerMasterruns in local or a cluster)
When created, MapOutputTrackerMaster starts dispatcher threads on the map-output-dispatcher thread pool.
MapOutputTrackerMaster is created when:
SparkEnvutility is used to create a SparkEnv for the driver
maxRpcMessageSize¶
maxRpcMessageSize is...FIXME
BroadcastManager¶
MapOutputTrackerMaster is given a BroadcastManager to be created.
Shuffle Map Output Status Registry¶
MapOutputTrackerMaster uses an internal registry of ShuffleStatuses by shuffle stages.
MapOutputTrackerMaster adds a new shuffle when requested to register one (when DAGScheduler is requested to create a ShuffleMapStage for a ShuffleDependency).
MapOutputTrackerMaster uses the registry when requested for the following:
-
unregisterMapOutput, unregisterAllMapOutput, unregisterShuffle, removeOutputsOnHost, removeOutputsOnExecutor, containsShuffle, getNumAvailableOutputs, findMissingPartitions, getLocationsWithLargestOutputs, getMapSizesByExecutorId
MapOutputTrackerMaster removes (clears) all shuffles when requested to stop.
Configuration Properties¶
MapOutputTrackerMaster uses the following configuration properties:
Map and Reduce Task Thresholds for Preferred Locations¶
MapOutputTrackerMaster defines 1000 (tasks) as the hardcoded threshold of the number of map and reduce tasks when requested to compute preferred locations with spark.shuffle.reduceLocality.enabled.
Map Output Threshold for Preferred Location of Reduce Tasks¶
MapOutputTrackerMaster defines 0.2 as the fraction of total map output that must be at a location for it to considered as a preferred location for a reduce task.
Making this larger will focus on fewer locations where most data can be read locally, but may lead to more delay in scheduling if those locations are busy.
MapOutputTrackerMaster uses the fraction when requested for the preferred locations of shuffle RDDs.
GetMapOutputMessage Queue¶
MapOutputTrackerMaster uses a blocking queue (a Java LinkedBlockingQueue) for requests for map output statuses.
GetMapOutputMessage(
shuffleId: Int,
context: RpcCallContext)
GetMapOutputMessage holds the shuffle ID and the RpcCallContext of the caller.
A new GetMapOutputMessage is added to the queue when MapOutputTrackerMaster is requested to post one.
MapOutputTrackerMaster uses MessageLoop Dispatcher Threads to process GetMapOutputMessages.
MessageLoop Dispatcher Thread¶
MessageLoop is a thread of execution to handle GetMapOutputMessages until a PoisonPill marker message arrives (when MapOutputTrackerMaster is requested to stop).
MessageLoop takes a GetMapOutputMessage and prints out the following DEBUG message to the logs:
Handling request to send map output locations for shuffle [shuffleId] to [hostPort]
MessageLoop then finds the ShuffleStatus by the shuffle ID in the shuffleStatuses internal registry and replies back (to the RPC client) with a serialized map output status (with the BroadcastManager and spark.shuffle.mapOutput.minSizeForBroadcast configuration property).
MessageLoop threads run on the map-output-dispatcher Thread Pool.
map-output-dispatcher Thread Pool¶
threadpool: ThreadPoolExecutor
threadpool is a daemon fixed thread pool registered with map-output-dispatcher thread name prefix.
threadpool uses spark.shuffle.mapOutput.dispatcher.numThreads configuration property for the number of MessageLoop dispatcher threads to process received GetMapOutputMessage messages.
The dispatcher threads are started immediately when MapOutputTrackerMaster is created.
The thread pool is shut down when MapOutputTrackerMaster is requested to stop.
Epoch Number¶
MapOutputTrackerMaster uses an epoch number to...FIXME
getEpoch is used when:
-
DAGScheduleris requested to removeExecutorAndUnregisterOutputs -
TaskSetManager is created (and sets the epoch to tasks)
Enqueueing GetMapOutputMessage¶
post(
message: GetMapOutputMessage): Unit
post simply adds the input GetMapOutputMessage to the mapOutputRequests internal queue.
post is used when MapOutputTrackerMasterEndpoint is requested to handle a GetMapOutputStatuses message.
Stopping MapOutputTrackerMaster¶
stop(): Unit
stop...FIXME
stop is part of the MapOutputTracker abstraction.
Unregistering Shuffle Map Output¶
unregisterMapOutput(
shuffleId: Int,
mapId: Int,
bmAddress: BlockManagerId): Unit
unregisterMapOutput...FIXME
unregisterMapOutput is used when DAGScheduler is requested to handle a task completion (due to a fetch failure).
Computing Preferred Locations¶
getPreferredLocationsForShuffle(
dep: ShuffleDependency[_, _, _],
partitionId: Int): Seq[String]
getPreferredLocationsForShuffle computes the locations (BlockManagers) with the most shuffle map outputs for the input ShuffleDependency and Partition.
getPreferredLocationsForShuffle computes the locations when all of the following are met:
-
spark.shuffle.reduceLocality.enabled configuration property is enabled
-
The number of "map" partitions (of the RDD of the input ShuffleDependency) is below SHUFFLE_PREF_MAP_THRESHOLD
-
The number of "reduce" partitions (of the Partitioner of the input ShuffleDependency) is below SHUFFLE_PREF_REDUCE_THRESHOLD
Note
getPreferredLocationsForShuffle is simply getLocationsWithLargestOutputs with a guard condition.
Internally, getPreferredLocationsForShuffle checks whether spark.shuffle.reduceLocality.enabled configuration property is enabled with the number of partitions of the RDD of the input ShuffleDependency and partitions in the partitioner of the input ShuffleDependency both being less than 1000.
Note
The thresholds for the number of partitions in the RDD and of the partitioner when computing the preferred locations are 1000 and are not configurable.
If the condition holds, getPreferredLocationsForShuffle finds locations with the largest number of shuffle map outputs for the input ShuffleDependency and partitionId (with the number of partitions in the partitioner of the input ShuffleDependency and 0.2) and returns the hosts of the preferred BlockManagers.
Note
0.2 is the fraction of total map output that must be at a location to be considered as a preferred location for a reduce task. It is not configurable.
getPreferredLocationsForShuffle is used when ShuffledRDD and Spark SQL's ShuffledRowRDD are requested for preferred locations of a partition.
Finding Locations with Largest Number of Shuffle Map Outputs¶
getLocationsWithLargestOutputs(
shuffleId: Int,
reducerId: Int,
numReducers: Int,
fractionThreshold: Double): Option[Array[BlockManagerId]]
getLocationsWithLargestOutputs returns BlockManagerIds with the largest size (of all the shuffle blocks they manage) above the input fractionThreshold (given the total size of all the shuffle blocks for the shuffle across all BlockManagers).
Note
getLocationsWithLargestOutputs may return no BlockManagerId if their shuffle blocks do not total up above the input fractionThreshold.
Note
The input numReducers is not used.
Internally, getLocationsWithLargestOutputs queries the mapStatuses internal cache for the input shuffleId.
Note
One entry in mapStatuses internal cache is a MapStatus array indexed by partition id.
MapStatus includes information about the BlockManager (as BlockManagerId) and estimated size of the reduce blocks.
getLocationsWithLargestOutputs iterates over the MapStatus array and builds an interim mapping between BlockManagerId and the cumulative sum of shuffle blocks across BlockManagers.
Incrementing Epoch¶
incrementEpoch(): Unit
incrementEpoch increments the internal epoch.
incrementEpoch prints out the following DEBUG message to the logs:
Increasing epoch to [epoch]
incrementEpoch is used when:
-
MapOutputTrackerMasteris requested to unregisterMapOutput, unregisterAllMapOutput, removeOutputsOnHost and removeOutputsOnExecutor -
DAGScheduleris requested to handle a ShuffleMapTask completion (of aShuffleMapStage)
Checking Availability of Shuffle Map Output Status¶
containsShuffle(
shuffleId: Int): Boolean
containsShuffle checks if the input shuffleId is registered in the cachedSerializedStatuses or mapStatuses internal caches.
containsShuffle is used when DAGScheduler is requested to create a createShuffleMapStage (for a ShuffleDependency).
Registering Shuffle¶
registerShuffle(
shuffleId: Int,
numMaps: Int): Unit
registerShuffle registers a new ShuffleStatus (for the given shuffle ID and the number of partitions) to the shuffleStatuses internal registry.
registerShuffle throws an IllegalArgumentException when the shuffle ID has already been registered:
Shuffle ID [shuffleId] registered twice
registerShuffle is used when:
DAGScheduleris requested to create a ShuffleMapStage (for a ShuffleDependency)
Registering Map Outputs for Shuffle (Possibly with Epoch Change)¶
registerMapOutputs(
shuffleId: Int,
statuses: Array[MapStatus],
changeEpoch: Boolean = false): Unit
registerMapOutputs registers the input statuses (as the shuffle map output) with the input shuffleId in the mapStatuses internal cache.
registerMapOutputs increments epoch if the input changeEpoch is enabled (it is not by default).
registerMapOutputs is used when DAGScheduler handles successful ShuffleMapTask completion and executor lost events.
Finding Serialized Map Output Statuses (And Possibly Broadcasting Them)¶
getSerializedMapOutputStatuses(
shuffleId: Int): Array[Byte]
getSerializedMapOutputStatuses finds cached serialized map statuses for the input shuffleId.
If found, getSerializedMapOutputStatuses returns the cached serialized map statuses.
Otherwise, getSerializedMapOutputStatuses acquires the shuffle lock for shuffleId and finds cached serialized map statuses again since some other thread could not update the cachedSerializedStatuses internal cache.
getSerializedMapOutputStatuses returns the serialized map statuses if found.
If not, getSerializedMapOutputStatuses serializes the local array of MapStatuses (from checkCachedStatuses).
getSerializedMapOutputStatuses prints out the following INFO message to the logs:
Size of output statuses for shuffle [shuffleId] is [bytes] bytes
getSerializedMapOutputStatuses saves the serialized map output statuses in cachedSerializedStatuses internal cache if the epoch has not changed in the meantime. getSerializedMapOutputStatuses also saves its broadcast version in cachedSerializedBroadcast internal cache.
If the epoch has changed in the meantime, the serialized map output statuses and their broadcast version are not saved, and getSerializedMapOutputStatuses prints out the following INFO message to the logs:
Epoch changed, not caching!
getSerializedMapOutputStatuses removes the broadcast.
getSerializedMapOutputStatuses returns the serialized map statuses.
getSerializedMapOutputStatuses is used when MapOutputTrackerMaster responds to GetMapOutputMessage requests and DAGScheduler creates ShuffleMapStage for ShuffleDependency (copying the shuffle map output locations from previous jobs to avoid unnecessarily regenerating data).
Finding Cached Serialized Map Statuses¶
checkCachedStatuses(): Boolean
checkCachedStatuses is an internal helper method that <statuses, retBytes and epochGotten (that getSerializedMapOutputStatuses uses).
Internally, checkCachedStatuses acquires the MapOutputTracker.md#epochLock[epochLock lock] and checks the status of <
If epoch is younger (i.e. greater), checkCachedStatuses clears <cacheEpoch to be epoch.
checkCachedStatuses gets the serialized map output statuses for the shuffleId (of the owning <
When the serialized map output status is found, checkCachedStatuses saves it in a local retBytes and returns true.
When not found, you should see the following DEBUG message in the logs:
cached status not found for : [shuffleId]
checkCachedStatuses uses MapOutputTracker.md#mapStatuses[mapStatuses] internal cache to get map output statuses for the shuffleId (of the owning <statuses. checkCachedStatuses sets the local epochGotten to the current <false.
Registering Shuffle Map Output¶
registerMapOutput(
shuffleId: Int,
mapId: Int,
status: MapStatus): Unit
registerMapOutput finds the ShuffleStatus by the given shuffle ID and adds the given MapStatus:
-
The given mapId is the partitionId of the ShuffleMapTask that finished.
-
The given shuffleId is the shuffleId of the ShuffleDependency of the ShuffleMapStage (for which the
ShuffleMapTaskcompleted)
registerMapOutput is used when DAGScheduler is requested to handle a ShuffleMapTask completion.
Map Output Statistics for ShuffleDependency¶
getStatistics(
dep: ShuffleDependency[_, _, _]): MapOutputStatistics
getStatistics requests the input ShuffleDependency for the shuffle ID and looks up the corresponding ShuffleStatus (in the shuffleStatuses registry).
getStatistics assumes that the ShuffleStatus is in shuffleStatuses registry.
getStatistics requests the ShuffleStatus for the MapStatuses (of the ShuffleDependency).
getStatistics uses the spark.shuffle.mapOutput.parallelAggregationThreshold configuration property to decide on parallelism to calculate the statistics.
With no parallelism, getStatistics simply traverses over the MapStatuses and requests them (one by one) for the size of every shuffle block.
Note
getStatistics requests the given ShuffleDependency for the Partitioner that in turn is requested for the number of partitions.
The number of blocks is the number of MapStatuses multiplied by the number of partitions.
And hence the need for parallelism based on the spark.shuffle.mapOutput.parallelAggregationThreshold configuration property.
In the end, getStatistics creates a MapOutputStatistics with the shuffle ID (of the given ShuffleDependency) and the total sizes (sumed up for every partition).
getStatistics is used when:
DAGScheduleris requested to handle a successful ShuffleMapStage submission and markMapStageJobsAsFinished
Deregistering All Map Outputs of Shuffle Stage¶
unregisterAllMapOutput(
shuffleId: Int): Unit
unregisterAllMapOutput...FIXME
unregisterAllMapOutput is used when DAGScheduler is requested to handle a task completion (due to a fetch failure).
Deregistering Shuffle¶
unregisterShuffle(
shuffleId: Int): Unit
unregisterShuffle...FIXME
unregisterShuffle is part of the MapOutputTracker abstraction.
Deregistering Shuffle Outputs Associated with Host¶
removeOutputsOnHost(
host: String): Unit
removeOutputsOnHost...FIXME
removeOutputsOnHost is used when DAGScheduler is requested to removeExecutorAndUnregisterOutputs and handle a worker removal.
Deregistering Shuffle Outputs Associated with Executor¶
removeOutputsOnExecutor(
execId: String): Unit
removeOutputsOnExecutor...FIXME
removeOutputsOnExecutor is used when DAGScheduler is requested to removeExecutorAndUnregisterOutputs.
Number of Partitions with Shuffle Map Outputs Available¶
getNumAvailableOutputs(
shuffleId: Int): Int
getNumAvailableOutputs...FIXME
getNumAvailableOutputs is used when ShuffleMapStage is requested for the number of partitions with shuffle outputs available.
Finding Missing Partitions¶
findMissingPartitions(
shuffleId: Int): Option[Seq[Int]]
findMissingPartitions...FIXME
findMissingPartitions is used when ShuffleMapStage is requested for missing partitions.
Finding Locations with Blocks and Sizes¶
getMapSizesByExecutorId(
shuffleId: Int,
startPartition: Int,
endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long)])]
getMapSizesByExecutorId is part of the MapOutputTracker abstraction.
getMapSizesByExecutorId returns a collection of BlockManagerIds with their blocks and sizes.
When executed, getMapSizesByExecutorId prints out the following DEBUG message to the logs:
Fetching outputs for shuffle [id], partitions [startPartition]-[endPartition]
getMapSizesByExecutorId finds map outputs for the input shuffleId.
Note
getMapSizesByExecutorId gets the map outputs for all the partitions (despite the method's signature).
In the end, getMapSizesByExecutorId converts shuffle map outputs (as MapStatuses) into the collection of BlockManagerIds with their blocks and sizes.
Logging¶
Enable ALL logging level for org.apache.spark.MapOutputTrackerMaster logger to see what happens inside.
Add the following line to conf/log4j.properties:
log4j.logger.org.apache.spark.MapOutputTrackerMaster=ALL
Refer to Logging.