ShuffleBlockFetcherIterator¶
ShuffleBlockFetcherIterator
is an Iterator[(BlockId, InputStream)]
(Scala) that fetches shuffle blocks from local or remote BlockManagers (and makes them available as an InputStream
).
ShuffleBlockFetcherIterator
allows for a synchronous iteration over shuffle blocks so a caller can handle them in a pipelined fashion as they are received.
ShuffleBlockFetcherIterator
is exhausted (and can provide no elements) when the number of blocks already processed is at least the total number of blocks to fetch.
ShuffleBlockFetcherIterator
throttles the remote fetches to avoid consuming too much memory.
Creating Instance¶
ShuffleBlockFetcherIterator
takes the following to be created:
- TaskContext
- BlockStoreClient
- BlockManager
- Blocks to Fetch by Address (
Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])]
) - Stream Wrapper Function (
(BlockId, InputStream) => InputStream
) - spark.reducer.maxSizeInFlight
- spark.reducer.maxReqsInFlight
- spark.reducer.maxBlocksInFlightPerAddress
- spark.network.maxRemoteBlockSizeFetchToMem
- spark.shuffle.detectCorrupt
- spark.shuffle.detectCorrupt.useExtraMemory
-
ShuffleReadMetricsReporter
-
doBatchFetch
flag
While being created, ShuffleBlockFetcherIterator
initializes itself.
ShuffleBlockFetcherIterator
is created when:
BlockStoreShuffleReader
is requested to read combined key-value records for a reduce task
Initializing¶
initialize(): Unit
initialize
registers a task cleanup and fetches shuffle blocks from remote and local storage:BlockManager.md[BlockManagers].
Internally, initialize
uses the TaskContext to register the ShuffleFetchCompletionListener (to cleanup).
initialize
partitionBlocksByFetchMode.
initialize
...FIXME
partitionBlocksByFetchMode¶
partitionBlocksByFetchMode(): ArrayBuffer[FetchRequest]
partitionBlocksByFetchMode
...FIXME
collectFetchRequests¶
collectFetchRequests(
address: BlockManagerId,
blockInfos: Seq[(BlockId, Long, Int)],
collectedRemoteRequests: ArrayBuffer[FetchRequest]): Unit
collectFetchRequests
...FIXME
createFetchRequests¶
createFetchRequests(
curBlocks: Seq[FetchBlockInfo],
address: BlockManagerId,
isLast: Boolean,
collectedRemoteRequests: ArrayBuffer[FetchRequest]): Seq[FetchBlockInfo]
createFetchRequests
...FIXME
fetchUpToMaxBytes¶
fetchUpToMaxBytes(): Unit
fetchUpToMaxBytes
...FIXME
fetchUpToMaxBytes
is used when:
ShuffleBlockFetcherIterator
is requested to initialize and next
Sending Remote Shuffle Block Fetch Request¶
sendRequest(
req: FetchRequest): Unit
sendRequest
prints out the following DEBUG message to the logs:
Sending request for [n] blocks ([size]) from [hostPort]
sendRequest
add the size of the blocks in the FetchRequest
to bytesInFlight and increments the reqsInFlight internal counters.
sendRequest
requests the ShuffleClient to fetch the blocks with a new BlockFetchingListener (and this ShuffleBlockFetcherIterator
when the size of the blocks in the FetchRequest
is higher than the maxReqSizeShuffleToMem).
sendRequest
is used when:
ShuffleBlockFetcherIterator
is requested to fetch remote shuffle blocks
BlockFetchingListener¶
sendRequest
creates a new BlockFetchingListener to be notified about successes or failures of shuffle block fetch requests.
onBlockFetchSuccess¶
On onBlockFetchSuccess the BlockFetchingListener
adds a SuccessFetchResult
to the results registry and prints out the following DEBUG message to the logs (when not a zombie):
remainingBlocks: [remainingBlocks]
In the end, onBlockFetchSuccess
prints out the following TRACE message to the logs:
Got remote block [blockId] after [time]
onBlockFetchFailure¶
On onBlockFetchFailure the BlockFetchingListener
adds a FailureFetchResult
to the results registry and prints out the following ERROR message to the logs:
Failed to get block(s) from [host]:[port]
FetchResults¶
results: LinkedBlockingQueue[FetchResult]
ShuffleBlockFetcherIterator
uses an internal FIFO blocking queue (Java) of FetchResult
s.
results
is used for fetching the next element.
For remote blocks, FetchResult
s are added in sendRequest:
SuccessFetchResult
s after aBlockFetchingListener
is notified about onBlockFetchSuccessFailureFetchResult
s after aBlockFetchingListener
is notified about onBlockFetchFailure
For local blocks, FetchResult
s are added in fetchLocalBlocks:
SuccessFetchResult
s after the BlockManager has successfully getLocalBlockDataFailureFetchResult
s otherwise
For local blocks, FetchResult
s are added in fetchHostLocalBlock:
SuccessFetchResult
s after the BlockManager has successfully getHostLocalShuffleDataFailureFetchResult
s otherwise
FailureFetchResult
s can also be added in fetchHostLocalBlocks.
Cleaned up in cleanup
hasNext¶
hasNext: Boolean
hasNext
is part of the Iterator
(Scala) abstraction (to test whether this iterator can provide another element).
hasNext
is true
when numBlocksProcessed is below numBlocksToFetch.
Retrieving Next Element¶
next(): (BlockId, InputStream)
next
increments the numBlocksProcessed registry.
next
takes (and removes) the head of the results queue.
next
requests the ShuffleReadMetricsReporter to incFetchWaitTime
.
next
...FIXME
next
throws a NoSuchElementException
if there is no element left.
next
is part of the Iterator
(Scala) abstraction (to produce the next element of this iterator).
numBlocksProcessed¶
The number of blocks fetched and consumed
numBlocksToFetch¶
Total number of blocks to fetch and consume
ShuffleBlockFetcherIterator
can produce up to numBlocksToFetch
elements.
numBlocksToFetch
is increased every time ShuffleBlockFetcherIterator
is requested to partitionBlocksByFetchMode that prints it out as the INFO message to the logs:
Getting [numBlocksToFetch] non-empty blocks out of [totalBlocks] blocks
releaseCurrentResultBuffer¶
releaseCurrentResultBuffer(): Unit
releaseCurrentResultBuffer
...FIXME
releaseCurrentResultBuffer
is used when:
ShuffleBlockFetcherIterator
is requested to cleanupBufferReleasingInputStream
is requested toclose
ShuffleFetchCompletionListener¶
ShuffleBlockFetcherIterator
creates a ShuffleFetchCompletionListener when created.
ShuffleFetchCompletionListener
is used when initialize and toCompletionIterator.
Cleaning Up¶
cleanup(): Unit
cleanup
marks this ShuffleBlockFetcherIterator
a zombie.
cleanup
releases the current result buffer.
cleanup
iterates over results internal queue and for every SuccessFetchResult
, increments remote bytes read and blocks fetched shuffle task metrics, and eventually releases the managed buffer.
bytesInFlight¶
The bytes of fetched remote shuffle blocks in flight
Starts at 0
when ShuffleBlockFetcherIterator
is created
Incremented every sendRequest and decremented every next.
ShuffleBlockFetcherIterator
makes sure that the invariant of bytesInFlight
is below maxBytesInFlight every remote shuffle block fetch.
reqsInFlight¶
The number of remote shuffle block fetch requests in flight.
Starts at 0
when ShuffleBlockFetcherIterator
is created
Incremented every sendRequest and decremented every next.
ShuffleBlockFetcherIterator
makes sure that the invariant of reqsInFlight
is below maxReqsInFlight every remote shuffle block fetch.
isZombie¶
Controls whether ShuffleBlockFetcherIterator
is still active and records SuccessFetchResult
s on successful shuffle block fetches.
Starts false
when ShuffleBlockFetcherIterator
is created
Enabled (true
) in cleanup.
When enabled, registerTempFileToClean is a noop.
DownloadFileManager¶
ShuffleBlockFetcherIterator
is a DownloadFileManager.
throwFetchFailedException¶
throwFetchFailedException(
blockId: BlockId,
mapIndex: Int,
address: BlockManagerId,
e: Throwable,
message: Option[String] = None): Nothing
throwFetchFailedException
takes the message
(if defined) or uses the message of the given Throwable
.
In the end, throwFetchFailedException
throws a FetchFailedException if the BlockId is either a ShuffleBlockId
or a ShuffleBlockBatchId
. Otherwise, throwFetchFailedException
throws a SparkException
:
Failed to get block [blockId], which is not a shuffle block
throwFetchFailedException
is used when:
ShuffleBlockFetcherIterator
is requested to nextBufferReleasingInputStream
is requested totryOrFetchFailedException
Logging¶
Enable ALL
logging level for org.apache.spark.storage.ShuffleBlockFetcherIterator
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.storage.ShuffleBlockFetcherIterator=ALL
Refer to Logging.