ShuffleBlockFetcherIterator¶
ShuffleBlockFetcherIterator is an Iterator[(BlockId, InputStream)] (Scala) that fetches shuffle blocks from local or remote BlockManagers (and makes them available as an InputStream).
ShuffleBlockFetcherIterator allows for a synchronous iteration over shuffle blocks so a caller can handle them in a pipelined fashion as they are received.
ShuffleBlockFetcherIterator is exhausted (and can provide no elements) when the number of blocks already processed is at least the total number of blocks to fetch.
ShuffleBlockFetcherIterator throttles the remote fetches to avoid consuming too much memory.
Creating Instance¶
ShuffleBlockFetcherIterator takes the following to be created:
- TaskContext
- BlockStoreClient
- BlockManager
- Blocks to Fetch by Address (
Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])]) - Stream Wrapper Function (
(BlockId, InputStream) => InputStream) - spark.reducer.maxSizeInFlight
- spark.reducer.maxReqsInFlight
- spark.reducer.maxBlocksInFlightPerAddress
- spark.network.maxRemoteBlockSizeFetchToMem
- spark.shuffle.detectCorrupt
- spark.shuffle.detectCorrupt.useExtraMemory
- ShuffleReadMetricsReporter
-
doBatchFetchflag
While being created, ShuffleBlockFetcherIterator initializes itself.
ShuffleBlockFetcherIterator is created when:
BlockStoreShuffleReaderis requested to read combined key-value records for a reduce task
Initializing¶
initialize(): Unit
initialize registers a task cleanup and fetches shuffle blocks from remote and local storage:BlockManager.md[BlockManagers].
Internally, initialize uses the TaskContext to register the ShuffleFetchCompletionListener (to cleanup).
initialize partitionBlocksByFetchMode.
initialize...FIXME
partitionBlocksByFetchMode¶
partitionBlocksByFetchMode(): ArrayBuffer[FetchRequest]
partitionBlocksByFetchMode...FIXME
collectFetchRequests¶
collectFetchRequests(
address: BlockManagerId,
blockInfos: Seq[(BlockId, Long, Int)],
collectedRemoteRequests: ArrayBuffer[FetchRequest]): Unit
collectFetchRequests...FIXME
createFetchRequests¶
createFetchRequests(
curBlocks: Seq[FetchBlockInfo],
address: BlockManagerId,
isLast: Boolean,
collectedRemoteRequests: ArrayBuffer[FetchRequest]): Seq[FetchBlockInfo]
createFetchRequests...FIXME
fetchUpToMaxBytes¶
fetchUpToMaxBytes(): Unit
fetchUpToMaxBytes...FIXME
fetchUpToMaxBytes is used when:
ShuffleBlockFetcherIteratoris requested to initialize and next
Sending Remote Shuffle Block Fetch Request¶
sendRequest(
req: FetchRequest): Unit
sendRequest prints out the following DEBUG message to the logs:
Sending request for [n] blocks ([size]) from [hostPort]
sendRequest add the size of the blocks in the FetchRequest to bytesInFlight and increments the reqsInFlight internal counters.
sendRequest requests the ShuffleClient to fetch the blocks with a new BlockFetchingListener (and this ShuffleBlockFetcherIterator when the size of the blocks in the FetchRequest is higher than the maxReqSizeShuffleToMem).
sendRequest is used when:
ShuffleBlockFetcherIteratoris requested to fetch remote shuffle blocks
BlockFetchingListener¶
sendRequest creates a new BlockFetchingListener to be notified about successes or failures of shuffle block fetch requests.
onBlockFetchSuccess¶
On onBlockFetchSuccess the BlockFetchingListener adds a SuccessFetchResult to the results registry and prints out the following DEBUG message to the logs (when not a zombie):
remainingBlocks: [remainingBlocks]
In the end, onBlockFetchSuccess prints out the following TRACE message to the logs:
Got remote block [blockId] after [time]
onBlockFetchFailure¶
On onBlockFetchFailure the BlockFetchingListener adds a FailureFetchResult to the results registry and prints out the following ERROR message to the logs:
Failed to get block(s) from [host]:[port]
FetchResults¶
results: LinkedBlockingQueue[FetchResult]
ShuffleBlockFetcherIterator uses an internal FIFO blocking queue (Java) of FetchResults.
results is used for fetching the next element.
For remote blocks, FetchResults are added in sendRequest:
SuccessFetchResults after aBlockFetchingListeneris notified about onBlockFetchSuccessFailureFetchResults after aBlockFetchingListeneris notified about onBlockFetchFailure
For local blocks, FetchResults are added in fetchLocalBlocks:
SuccessFetchResults after the BlockManager has successfully getLocalBlockDataFailureFetchResults otherwise
For local blocks, FetchResults are added in fetchHostLocalBlock:
SuccessFetchResults after the BlockManager has successfully getHostLocalShuffleDataFailureFetchResults otherwise
FailureFetchResults can also be added in fetchHostLocalBlocks.
Cleaned up in cleanup
hasNext¶
hasNext: Boolean
hasNext is part of the Iterator (Scala) abstraction (to test whether this iterator can provide another element).
hasNext is true when numBlocksProcessed is below numBlocksToFetch.
Retrieving Next Element¶
next(): (BlockId, InputStream)
next increments the numBlocksProcessed registry.
next takes (and removes) the head of the results queue.
next requests the ShuffleReadMetricsReporter to incFetchWaitTime.
next...FIXME
next throws a NoSuchElementException if there is no element left.
next is part of the Iterator (Scala) abstraction (to produce the next element of this iterator).
numBlocksProcessed¶
The number of blocks fetched and consumed
numBlocksToFetch¶
Total number of blocks to fetch and consume
ShuffleBlockFetcherIterator can produce up to numBlocksToFetch elements.
numBlocksToFetch is increased every time ShuffleBlockFetcherIterator is requested to partitionBlocksByFetchMode that prints it out as the INFO message to the logs:
Getting [numBlocksToFetch] non-empty blocks out of [totalBlocks] blocks
releaseCurrentResultBuffer¶
releaseCurrentResultBuffer(): Unit
releaseCurrentResultBuffer...FIXME
releaseCurrentResultBuffer is used when:
ShuffleBlockFetcherIteratoris requested to cleanupBufferReleasingInputStreamis requested toclose
ShuffleFetchCompletionListener¶
ShuffleBlockFetcherIterator creates a ShuffleFetchCompletionListener when created.
ShuffleFetchCompletionListener is used when initialize and toCompletionIterator.
Cleaning Up¶
cleanup(): Unit
cleanup marks this ShuffleBlockFetcherIterator a zombie.
cleanup releases the current result buffer.
cleanup iterates over results internal queue and for every SuccessFetchResult, increments remote bytes read and blocks fetched shuffle task metrics, and eventually releases the managed buffer.
bytesInFlight¶
The bytes of fetched remote shuffle blocks in flight
Starts at 0 when ShuffleBlockFetcherIterator is created
Incremented every sendRequest and decremented every next.
ShuffleBlockFetcherIterator makes sure that the invariant of bytesInFlight is below maxBytesInFlight every remote shuffle block fetch.
reqsInFlight¶
The number of remote shuffle block fetch requests in flight.
Starts at 0 when ShuffleBlockFetcherIterator is created
Incremented every sendRequest and decremented every next.
ShuffleBlockFetcherIterator makes sure that the invariant of reqsInFlight is below maxReqsInFlight every remote shuffle block fetch.
isZombie¶
Controls whether ShuffleBlockFetcherIterator is still active and records SuccessFetchResults on successful shuffle block fetches.
Starts false when ShuffleBlockFetcherIterator is created
Enabled (true) in cleanup.
When enabled, registerTempFileToClean is a noop.
DownloadFileManager¶
ShuffleBlockFetcherIterator is a DownloadFileManager.
throwFetchFailedException¶
throwFetchFailedException(
blockId: BlockId,
mapIndex: Int,
address: BlockManagerId,
e: Throwable,
message: Option[String] = None): Nothing
throwFetchFailedException takes the message (if defined) or uses the message of the given Throwable.
In the end, throwFetchFailedException throws a FetchFailedException if the BlockId is either a ShuffleBlockId or a ShuffleBlockBatchId. Otherwise, throwFetchFailedException throws a SparkException:
Failed to get block [blockId], which is not a shuffle block
throwFetchFailedException is used when:
ShuffleBlockFetcherIteratoris requested to nextBufferReleasingInputStreamis requested totryOrFetchFailedException
Logging¶
Enable ALL logging level for org.apache.spark.storage.ShuffleBlockFetcherIterator logger to see what happens inside.
Add the following line to conf/log4j.properties:
log4j.logger.org.apache.spark.storage.ShuffleBlockFetcherIterator=ALL
Refer to Logging.