MapOutputTracker¶
MapOutputTracker
is an base abstraction of shuffle map output location registries.
Contract¶
getMapSizesByExecutorId¶
getMapSizesByExecutorId(
shuffleId: Int,
startPartition: Int,
endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])]
Used when:
SortShuffleManager
is requested for a ShuffleReader
getMapSizesByRange¶
getMapSizesByRange(
shuffleId: Int,
startMapIndex: Int,
endMapIndex: Int,
startPartition: Int,
endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])]
Used when:
SortShuffleManager
is requested for a ShuffleReader
unregisterShuffle¶
unregisterShuffle(
shuffleId: Int): Unit
Deletes map output status information for the specified shuffle stage
Used when:
ContextCleaner
is requested to doCleanupShuffleBlockManagerSlaveEndpoint
is requested to handle a RemoveShuffle message
Implementations¶
Creating Instance¶
MapOutputTracker
takes the following to be created:
Abstract Class
MapOutputTracker
is an abstract class and cannot be created directly. It is created indirectly for the concrete MapOutputTrackers.
Accessing MapOutputTracker¶
MapOutputTracker
is available using SparkEnv (on the driver and executors).
SparkEnv.get.mapOutputTracker
MapOutputTracker RPC Endpoint¶
trackerEndpoint
is a RpcEndpointRef of the MapOutputTracker RPC endpoint.
trackerEndpoint
is initialized (registered or looked up) when SparkEnv
is created for the driver and executors.
trackerEndpoint
is used to communicate (synchronously).
trackerEndpoint
is cleared (null
) when MapOutputTrackerMaster
is requested to stop.
Deregistering Map Output Status Information of Shuffle Stage¶
unregisterShuffle(
shuffleId: Int): Unit
Deregisters map output status information for the given shuffle stage
Used when:
-
ContextCleaner
is requested for shuffle cleanup -
BlockManagerSlaveEndpoint
is requested to remove a shuffle
Stopping MapOutputTracker¶
stop(): Unit
stop
does nothing at all.
stop
is used when SparkEnv
is requested to stop (and stops all the services, incl. MapOutputTracker
).
Converting MapStatuses To BlockManagerIds with ShuffleBlockIds and Their Sizes¶
convertMapStatuses(
shuffleId: Int,
startPartition: Int,
endPartition: Int,
statuses: Array[MapStatus]): Seq[(BlockManagerId, Seq[(BlockId, Long)])]
convertMapStatuses
iterates over the input statuses
array (of MapStatus entries indexed by map id) and creates a collection of BlockManagerIds (for each MapStatus
entry) with a ShuffleBlockId (with the input shuffleId
, a mapId
, and partition
ranging from the input startPartition
and endPartition
) and estimated size for the reduce block for every status and partitions.
For any empty MapStatus
, convertMapStatuses
prints out the following ERROR message to the logs:
Missing an output location for shuffle [id]
And convertMapStatuses
throws a MetadataFetchFailedException
(with shuffleId
, startPartition
, and the above error message).
convertMapStatuses
is used when:
MapOutputTrackerMaster
is requested for the sizes of shuffle map outputs by executor and rangeMapOutputTrackerWorker
is requested to sizes of shuffle map outputs by executor and range
Sending Blocking Messages To trackerEndpoint RpcEndpointRef¶
askTracker[T](message: Any): T
askTracker
sends the input message
to trackerEndpoint RpcEndpointRef and waits for a result.
When an exception happens, askTracker
prints out the following ERROR message to the logs and throws a SparkException
.
Error communicating with MapOutputTracker
askTracker
is used when MapOutputTracker
is requested to fetches map outputs for ShuffleDependency
remotely and sends a one-way message.
Epoch¶
Starts from 0
when MapOutputTracker
is created.
Can be updated (on MapOutputTrackerWorkers
) or incremented (on the driver's MapOutputTrackerMaster
).
sendTracker¶
sendTracker(
message: Any): Unit
sendTracker
...FIXME
sendTracker
is used when:
MapOutputTrackerMaster
is requested to stop
Utilities¶
serializeMapStatuses¶
serializeMapStatuses(
statuses: Array[MapStatus],
broadcastManager: BroadcastManager,
isLocal: Boolean,
minBroadcastSize: Int,
conf: SparkConf): (Array[Byte], Broadcast[Array[Byte]])
serializeMapStatuses
serializes the given array of map output locations into an efficient byte format (to send to reduce tasks). serializeMapStatuses
compresses the serialized bytes using GZIP. They are supposed to be pretty compressible because many map outputs will be on the same hostname.
Internally, serializeMapStatuses
creates a Java ByteArrayOutputStream.
serializeMapStatuses
writes out 0 (direct) first.
serializeMapStatuses
creates a Java GZIPOutputStream (with the ByteArrayOutputStream
created) and writes out the given statuses array.
serializeMapStatuses
decides whether to return the output array (of the output stream) or use a broadcast variable based on the size of the byte array.
If the size of the result byte array is the given minBroadcastSize
threshold or bigger, serializeMapStatuses
requests the input BroadcastManager
to create a broadcast variable.
serializeMapStatuses
resets the ByteArrayOutputStream
and starts over.
serializeMapStatuses
writes out 1 (broadcast) first.
serializeMapStatuses
creates a new Java GZIPOutputStream
(with the ByteArrayOutputStream
created) and writes out the broadcast variable.
serializeMapStatuses
prints out the following INFO message to the logs:
Broadcast mapstatuses size = [length], actual size = [length]
serializeMapStatuses
is used when ShuffleStatus
is requested to serialize shuffle map output statuses.
deserializeMapStatuses¶
deserializeMapStatuses(
bytes: Array[Byte],
conf: SparkConf): Array[MapStatus]
deserializeMapStatuses
...FIXME
deserializeMapStatuses
is used when:
MapOutputTrackerWorker
is requested to getStatuses