MapOutputTracker is a base abstraction of shuffle map output location registries that can find shuffle blocks by executor and deregister map output status information of a shuffle stage.
MapOutputTracker is available using SparkEnv (on the driver and executors).
MapOutputTracker takes a single SparkConf to be created.
trackerEndpoint is a RpcEndpointRef of the MapOutputTracker RPC endpoint.
trackerEndpoint is initialized (registered or looked up) when SparkEnv is created for the driver and executors.
trackerEndpoint is used to communicate (synchronously).
trackerEndpoint is cleared (
null) when MapOutputTrackerMaster is requested to stop.
serializeMapStatuses( statuses: Array[MapStatus], broadcastManager: BroadcastManager, isLocal: Boolean, minBroadcastSize: Int): (Array[Byte], Broadcast[Array[Byte]])
serializeMapStatuses serializes the given array of map output locations into an efficient byte format (to send to reduce tasks). serializeMapStatuses compresses the serialized bytes using GZIP. They are supposed to be pretty compressible because many map outputs will be on the same hostname.
Internally, serializeMapStatuses creates a Java ByteArrayOutputStream.
serializeMapStatuses writes out 0 (direct) first.
serializeMapStatuses creates a Java GZIPOutputStream (with the ByteArrayOutputStream created) and writes out the given statuses array.
serializeMapStatuses decides whether to return the output array (of the output stream) or use a broadcast variable based on the size of the byte array.
If the size of the result byte array is the given minBroadcastSize threshold or bigger, serializeMapStatuses requests the input BroadcastManager to create a broadcast variable.
serializeMapStatuses resets the ByteArrayOutputStream and starts over.
serializeMapStatuses writes out 1 (broadcast) first.
serializeMapStatuses creates a new Java GZIPOutputStream (with the ByteArrayOutputStream created) and writes out the broadcast variable.
serializeMapStatuses prints out the following INFO message to the logs:
Broadcast mapstatuses size = [length], actual size = [length]
serializeMapStatuses is used when ShuffleStatus is requested to serialize shuffle map output statuses.
deserializeMapStatuses( bytes: Array[Byte]): Array[MapStatus]
deserializeMapStatuses is used when…FIXME
sendTracker( message: Any): Unit
sendTracker is used when…FIXME
getMapSizesByExecutorId( shuffleId: Int, startPartition: Int, endPartition: Int): Seq[(BlockManagerId, Seq[(BlockId, Long)])]
getMapSizesByExecutorId returns a collection of BlockManagerIds with their blocks and sizes.
When executed, you should see the following DEBUG message in the logs:
Fetching outputs for shuffle [id], partitions [startPartition]-[endPartition]
getMapSizesByExecutorId finds map outputs for the input
|getMapSizesByExecutorId gets the map outputs for all the partitions (despite the method’s signature).|
getMapSizesByExecutorId is used when BlockStoreShuffleReader is requested to read combined records for a reduce task.
stop does nothing at all.
stop is used when SparkEnv is requested to stop (and stops all the services, incl. MapOutputTracker).
convertMapStatuses( shuffleId: Int, startPartition: Int, endPartition: Int, statuses: Array[MapStatus]): Seq[(BlockManagerId, Seq[(BlockId, Long)])]
convertMapStatuses iterates over the input
statuses array (of MapStatus entries indexed by map id) and creates a collection of BlockManagerIds (for each
MapStatus entry) with a ShuffleBlockId (with the input
partition ranging from the input
endPartition) and estimated size for the reduce block for every status and partitions.
For any empty
MapStatus, you should see the following ERROR message in the logs:
Missing an output location for shuffle [id]
And convertMapStatuses throws a
startPartition, and the above error message).
convertMapStatuses is used when MapOutputTracker computes
BlockManagerIds with their
ShuffleBlockIds and sizes.
askTracker[T](message: Any): T
When an exception happens, you should see the following ERROR message in the logs and askTracker throws a
Error communicating with MapOutputTracker
askTracker is used when MapOutputTracker fetches map outputs for
ShuffleDependency remotely and sends a one-way message.