ShuffleBlockFetcherIterator¶
ShuffleBlockFetcherIterator
is a Scala http://www.scala-lang.org/api/current/scala/collection/Iterator.html[Iterator] that fetches shuffle blocks (aka shuffle map outputs) from block managers.
ShuffleBlockFetcherIterator is <BlockStoreShuffleReader
is requested to shuffle:BlockStoreShuffleReader.md#read[read combined key-value records for a reduce task].
ShuffleBlockFetcherIterator allows for <(BlockId, InputStream)
pairs so a caller can handle shuffle blocks in a pipelined fashion as they are received.
ShuffleBlockFetcherIterator is exhausted (i.e. <
ShuffleBlockFetcherIterator <
[[internal-registries]] .ShuffleBlockFetcherIterator's Internal Registries and Counters [cols="1,2",options="header",width="100%"] |=== | Name | Description
| numBlocksProcessed
| [[numBlocksProcessed]] The number of blocks <
| numBlocksToFetch
a| [[numBlocksToFetch]] Total number of blocks to <
ShuffleBlockFetcherIterator can <numBlocksToFetch
elements.
numBlocksToFetch
is increased every time ShuffleBlockFetcherIterator is requested to <
Getting [numBlocksToFetch] non-empty blocks out of [totalBlocks] blocks
| [[results]] results
| Internal FIFO blocking queue (using Java's https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/LinkedBlockingQueue.html[java.util.concurrent.LinkedBlockingQueue]) to hold FetchResult
remote and local fetch results.
Used in:
-
<
> to take one FetchResult
off the queue, -
<
> to put SuccessFetchResult
orFailureFetchResult
remote fetch results (as part ofBlockFetchingListener
callback), -
<
> (similarly to < >) to put local fetch results, -
<
> to release managed buffers for SuccessFetchResult
results.
| [[maxBytesInFlight]] maxBytesInFlight
| The maximum size (in bytes) of all the remote shuffle blocks to fetch.
Set when <
| [[maxReqsInFlight]] maxReqsInFlight
| The maximum number of remote requests to fetch shuffle blocks.
Set when <
| [[bytesInFlight]] bytesInFlight
| The bytes of fetched remote shuffle blocks in flight
Starts at 0
when <
Incremented every <
ShuffleBlockFetcherIterator makes sure that the invariant of bytesInFlight
below <
| [[reqsInFlight]] reqsInFlight
| The number of remote shuffle block fetch requests in flight.
Starts at 0
when <
Incremented every <
ShuffleBlockFetcherIterator makes sure that the invariant of reqsInFlight
below <
| [[isZombie]] isZombie
| Flag whether ShuffleBlockFetcherIterator is still active. It is disabled, i.e. false
, when <
<sendRequest
) will no longer add fetched remote shuffle blocks into <
| [[currentResult]] currentResult
| The currently-processed SuccessFetchResult
Set when ShuffleBlockFetcherIterator <
[TIP]¶
Enable ERROR
, WARN
, INFO
, DEBUG
or TRACE
logging levels for org.apache.spark.storage.ShuffleBlockFetcherIterator
logger to see what happens in ShuffleBlockFetcherIterator.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.storage.ShuffleBlockFetcherIterator=TRACE
Refer to spark-logging.md[Logging].¶
== [[fetchUpToMaxBytes]] fetchUpToMaxBytes
Method
CAUTION: FIXME
Creating Instance¶
When created, ShuffleBlockFetcherIterator takes the following:
- [[context]] TaskContext
- [[shuffleClient]] storage:ShuffleClient.md[]
- [[blockManager]] storage:BlockManager.md[BlockManager]
- [[blocksByAddress]] Blocks to fetch per storage:BlockManager.md[BlockManager] (as
Seq[(BlockManagerId, Seq[(BlockId, Long)])]
) - [[streamWrapper]] Function to wrap the returned input stream (as
(BlockId, InputStream) => InputStream
) - <
> -- the maximum size (in bytes) of map outputs to fetch simultaneously from each reduce task (controlled by shuffle:BlockStoreShuffleReader.md#spark_reducer_maxSizeInFlight[spark.reducer.maxSizeInFlight] Spark property) - <
> -- the maximum number of remote requests to fetch blocks at any given point (controlled by shuffle:BlockStoreShuffleReader.md#spark_reducer_maxReqsInFlight[spark.reducer.maxReqsInFlight] Spark property) - [[maxBlocksInFlightPerAddress]]
maxBlocksInFlightPerAddress
- [[maxReqSizeShuffleToMem]]
maxReqSizeShuffleToMem
- [[detectCorrupt]]
detectCorrupt
flag to detect any corruption in fetched blocks (controlled by shuffle:BlockStoreShuffleReader.md#spark_shuffle_detectCorrupt[spark.shuffle.detectCorrupt] Spark property)
== [[initialize]] Initializing ShuffleBlockFetcherIterator -- initialize
Internal Method
[source, scala]¶
initialize(): Unit¶
initialize
registers a task cleanup and fetches shuffle blocks from remote and local storage:BlockManager.md[BlockManagers].
Internally, initialize
registers a TaskCompletionListener
(that will <
initialize
<
initialize
<
As ShuffleBlockFetcherIterator is in initialization phase, initialize
makes sure that <0
. Otherwise, initialize
throws an exception.
initialize
<
You should see the following INFO message in the logs:
INFO ShuffleBlockFetcherIterator: Started [numFetches] remote fetches in [time] ms
initialize
<
You should see the following DEBUG message in the logs:
DEBUG ShuffleBlockFetcherIterator: Got local blocks in [time] ms
NOTE: initialize
is used exclusively when ShuffleBlockFetcherIterator is <
== [[sendRequest]] Sending Remote Shuffle Block Fetch Request -- sendRequest
Internal Method
[source, scala]¶
sendRequest(req: FetchRequest): Unit¶
Internally, when sendRequest
runs, you should see the following DEBUG message in the logs:
DEBUG ShuffleBlockFetcherIterator: Sending request for [blocks.size] blocks ([size] B) from [hostPort]
sendRequest
increments <
NOTE: The input FetchRequest
contains the remote storage:BlockManagerId.md[] address and the shuffle blocks to fetch (as a sequence of storage:BlockId.md[] and their sizes).
sendRequest
storage:ShuffleClient.md#fetchBlocks[requests ShuffleClient
to fetch shuffle blocks] (from the host, the port, and the executor as defined in the input FetchRequest
).
NOTE: ShuffleClient
was defined when <
sendRequest
registers a BlockFetchingListener
with ShuffleClient
that:
-
<
> adds it as SuccessFetchResult
to <> internal queue. -
<
> adds it as FailureFetchResult
to <> internal queue.
NOTE: sendRequest
is used exclusively when ShuffleBlockFetcherIterator is requested to <
=== [[sendRequest-BlockFetchingListener-onBlockFetchSuccess]] onBlockFetchSuccess Callback
[source, scala]¶
onBlockFetchSuccess(blockId: String, buf: ManagedBuffer): Unit¶
Internally, onBlockFetchSuccess
checks if the <
onBlockFetchSuccess
marks the input blockId
as received (i.e. removes it from all the blocks to fetch as requested in <
onBlockFetchSuccess
adds the managed buf
(as SuccessFetchResult
) to <
You should see the following DEBUG message in the logs:
DEBUG ShuffleBlockFetcherIterator: remainingBlocks: [blocks]
Regardless of zombie state of ShuffleBlockFetcherIterator, you should see the following TRACE message in the logs:
TRACE ShuffleBlockFetcherIterator: Got remote block [blockId] after [time] ms
=== [[sendRequest-BlockFetchingListener-onBlockFetchFailure]] onBlockFetchFailure Callback
[source, scala]¶
onBlockFetchFailure(blockId: String, e: Throwable): Unit¶
When onBlockFetchFailure
is called, you should see the following ERROR message in the logs:
ERROR ShuffleBlockFetcherIterator: Failed to get block(s) from [hostPort]
onBlockFetchFailure
adds the block (as FailureFetchResult
) to <
== [[throwFetchFailedException]] Throwing FetchFailedException (for ShuffleBlockId) -- throwFetchFailedException
Internal Method
[source, scala]¶
throwFetchFailedException( blockId: BlockId, address: BlockManagerId, e: Throwable): Nothing
throwFetchFailedException
throws a shuffle:FetchFailedException.md[FetchFailedException] when the input blockId
is a ShuffleBlockId
.
NOTE: throwFetchFailedException
creates a FetchFailedException
passing on the root cause of a failure, i.e. the input e
.
Otherwise, throwFetchFailedException
throws a SparkException
:
Failed to get block [blockId], which is not a shuffle block
NOTE: throwFetchFailedException
is used when ShuffleBlockFetcherIterator is requested for the <
== [[cleanup]] Releasing Resources -- cleanup
Internal Method
[source, scala]¶
cleanup(): Unit¶
Internally, cleanup
marks ShuffleBlockFetcherIterator a <
cleanup
<
cleanup
iterates over <SuccessFetchResult
, increments remote bytes read and blocks fetched shuffle task metrics, and eventually releases the managed buffer.
NOTE: cleanup
is used when <
== [[releaseCurrentResultBuffer]] Decrementing Reference Count Of and Releasing Result Buffer (for SuccessFetchResult) -- releaseCurrentResultBuffer
Internal Method
[source, scala]¶
releaseCurrentResultBuffer(): Unit¶
releaseCurrentResultBuffer
decrements the <
releaseCurrentResultBuffer
releases <
NOTE: releaseCurrentResultBuffer
is used when <BufferReleasingInputStream
closes.
== [[fetchLocalBlocks]] fetchLocalBlocks
Internal Method
[source, scala]¶
fetchLocalBlocks(): Unit¶
fetchLocalBlocks
...FIXME
NOTE: fetchLocalBlocks
is used when...FIXME
== [[hasNext]] hasNext
Method
[source, scala]¶
hasNext: Boolean¶
NOTE: hasNext
is part of Scala's ++https://www.scala-lang.org/api/current/scala/collection/Iterator.html#hasNext:Boolean++[Iterator Contract] to test whether this iterator can provide another element.
hasNext
is positive (true
) when <
Otherwise, hasNext
is negative (false
).
== [[splitLocalRemoteBlocks]] splitLocalRemoteBlocks
Internal Method
[source, scala]¶
splitLocalRemoteBlocks(): ArrayBuffer[FetchRequest]¶
splitLocalRemoteBlocks
...FIXME
NOTE: splitLocalRemoteBlocks
is used exclusively when ShuffleBlockFetcherIterator is requested to <
== [[next]] Retrieving Next Element -- next
Method
[source, scala]¶
next(): (BlockId, InputStream)¶
NOTE: next
is part of Scala's ++https://www.scala-lang.org/api/current/scala/collection/Iterator.html#next():A++[Iterator Contract] to produce the next element of this iterator.
next
...FIXME