Skip to content


ShuffleBlockFetcherIterator is a Scala[Iterator] that fetches shuffle blocks (aka shuffle map outputs) from block managers.

ShuffleBlockFetcherIterator is <> exclusively when BlockStoreShuffleReader is requested to[read combined key-value records for a reduce task].

ShuffleBlockFetcherIterator allows for <> as (BlockId, InputStream) pairs so a caller can handle shuffle blocks in a pipelined fashion as they are received.

ShuffleBlockFetcherIterator is exhausted (i.e. <>) when the <> is at least the <>.

ShuffleBlockFetcherIterator <> to avoid consuming too much memory.

[[internal-registries]] .ShuffleBlockFetcherIterator's Internal Registries and Counters [cols="1,2",options="header",width="100%"] |=== | Name | Description

| numBlocksProcessed | [[numBlocksProcessed]] The number of blocks <>.

| numBlocksToFetch a| [[numBlocksToFetch]] Total number of blocks to <>.

ShuffleBlockFetcherIterator can <> up to numBlocksToFetch elements.

numBlocksToFetch is increased every time ShuffleBlockFetcherIterator is requested to <> that prints it out as the INFO message to the logs:

Getting [numBlocksToFetch] non-empty blocks out of [totalBlocks] blocks

| [[results]] results | Internal FIFO blocking queue (using Java's[java.util.concurrent.LinkedBlockingQueue]) to hold FetchResult remote and local fetch results.

Used in:

  1. <> to take one FetchResult off the queue,

  2. <> to put SuccessFetchResult or FailureFetchResult remote fetch results (as part of BlockFetchingListener callback),

  3. <> (similarly to <>) to put local fetch results,

  4. <> to release managed buffers for SuccessFetchResult results.

| [[maxBytesInFlight]] maxBytesInFlight | The maximum size (in bytes) of all the remote shuffle blocks to fetch.

Set when <>.

| [[maxReqsInFlight]] maxReqsInFlight | The maximum number of remote requests to fetch shuffle blocks.

Set when <>.

| [[bytesInFlight]] bytesInFlight | The bytes of fetched remote shuffle blocks in flight

Starts at 0 when <>.

Incremented every <> and decremented every <>.

ShuffleBlockFetcherIterator makes sure that the invariant of bytesInFlight below <> holds every <>.

| [[reqsInFlight]] reqsInFlight | The number of remote shuffle block fetch requests in flight.

Starts at 0 when <>.

Incremented every <> and decremented every <>.

ShuffleBlockFetcherIterator makes sure that the invariant of reqsInFlight below <> holds every <>.

| [[isZombie]] isZombie | Flag whether ShuffleBlockFetcherIterator is still active. It is disabled, i.e. false, when <>.

<> (when the task using ShuffleBlockFetcherIterator finishes), the <> (registered in sendRequest) will no longer add fetched remote shuffle blocks into <> internal queue.

| [[currentResult]] currentResult | The currently-processed SuccessFetchResult

Set when ShuffleBlockFetcherIterator <(BlockId, InputStream) tuple>> and <> (on <>). |===


Enable ERROR, WARN, INFO, DEBUG or TRACE logging levels for logger to see what happens in ShuffleBlockFetcherIterator.

Add the following line to conf/

Refer to[Logging].

== [[fetchUpToMaxBytes]] fetchUpToMaxBytes Method


Creating Instance

When created, ShuffleBlockFetcherIterator takes the following:

  • [[context]] TaskContext
  • [[shuffleClient]][]
  • [[blockManager]][BlockManager]
  • [[blocksByAddress]] Blocks to fetch per[BlockManager] (as Seq[(BlockManagerId, Seq[(BlockId, Long)])])
  • [[streamWrapper]] Function to wrap the returned input stream (as (BlockId, InputStream) => InputStream)
  • <> -- the maximum size (in bytes) of map outputs to fetch simultaneously from each reduce task (controlled by[spark.reducer.maxSizeInFlight] Spark property)
  • <> -- the maximum number of remote requests to fetch blocks at any given point (controlled by[spark.reducer.maxReqsInFlight] Spark property)
  • [[maxBlocksInFlightPerAddress]] maxBlocksInFlightPerAddress
  • [[maxReqSizeShuffleToMem]] maxReqSizeShuffleToMem
  • [[detectCorrupt]] detectCorrupt flag to detect any corruption in fetched blocks (controlled by[spark.shuffle.detectCorrupt] Spark property)

== [[initialize]] Initializing ShuffleBlockFetcherIterator -- initialize Internal Method

[source, scala]

initialize(): Unit

initialize registers a task cleanup and fetches shuffle blocks from remote and local[BlockManagers].

Internally, initialize registers a TaskCompletionListener (that will <> right after the task finishes).

initialize <>.

initialize <fetchRequests internal registry)>>.

As ShuffleBlockFetcherIterator is in initialization phase, initialize makes sure that <> and <> internal counters are both 0. Otherwise, initialize throws an exception.

initialize <> (from remote[BlockManagers]).

You should see the following INFO message in the logs:

INFO ShuffleBlockFetcherIterator: Started [numFetches] remote fetches in [time] ms

initialize <>.

You should see the following DEBUG message in the logs:

DEBUG ShuffleBlockFetcherIterator: Got local blocks in  [time] ms

NOTE: initialize is used exclusively when ShuffleBlockFetcherIterator is <>.

== [[sendRequest]] Sending Remote Shuffle Block Fetch Request -- sendRequest Internal Method

[source, scala]

sendRequest(req: FetchRequest): Unit

Internally, when sendRequest runs, you should see the following DEBUG message in the logs:

DEBUG ShuffleBlockFetcherIterator: Sending request for [blocks.size] blocks ([size] B) from [hostPort]

sendRequest increments <> and <> internal counters.

NOTE: The input FetchRequest contains the remote[] address and the shuffle blocks to fetch (as a sequence of[] and their sizes).

sendRequest[requests ShuffleClient to fetch shuffle blocks] (from the host, the port, and the executor as defined in the input FetchRequest).

NOTE: ShuffleClient was defined when <>.

sendRequest registers a BlockFetchingListener with ShuffleClient that:

  1. <> adds it as SuccessFetchResult to <> internal queue.

  2. <> adds it as FailureFetchResult to <> internal queue.

NOTE: sendRequest is used exclusively when ShuffleBlockFetcherIterator is requested to <>.

=== [[sendRequest-BlockFetchingListener-onBlockFetchSuccess]] onBlockFetchSuccess Callback

[source, scala]

onBlockFetchSuccess(blockId: String, buf: ManagedBuffer): Unit

Internally, onBlockFetchSuccess checks if the <> and does the further processing if it is not.

onBlockFetchSuccess marks the input blockId as received (i.e. removes it from all the blocks to fetch as requested in <>).

onBlockFetchSuccess adds the managed buf (as SuccessFetchResult) to <> internal queue.

You should see the following DEBUG message in the logs:

DEBUG ShuffleBlockFetcherIterator: remainingBlocks: [blocks]

Regardless of zombie state of ShuffleBlockFetcherIterator, you should see the following TRACE message in the logs:

TRACE ShuffleBlockFetcherIterator: Got remote block [blockId] after [time] ms

=== [[sendRequest-BlockFetchingListener-onBlockFetchFailure]] onBlockFetchFailure Callback

[source, scala]

onBlockFetchFailure(blockId: String, e: Throwable): Unit

When onBlockFetchFailure is called, you should see the following ERROR message in the logs:

ERROR ShuffleBlockFetcherIterator: Failed to get block(s) from [hostPort]

onBlockFetchFailure adds the block (as FailureFetchResult) to <> internal queue.

== [[throwFetchFailedException]] Throwing FetchFailedException (for ShuffleBlockId) -- throwFetchFailedException Internal Method

[source, scala]

throwFetchFailedException( blockId: BlockId, address: BlockManagerId, e: Throwable): Nothing

throwFetchFailedException throws a[FetchFailedException] when the input blockId is a ShuffleBlockId.

NOTE: throwFetchFailedException creates a FetchFailedException passing on the root cause of a failure, i.e. the input e.

Otherwise, throwFetchFailedException throws a SparkException:

Failed to get block [blockId], which is not a shuffle block

NOTE: throwFetchFailedException is used when ShuffleBlockFetcherIterator is requested for the <>.

== [[cleanup]] Releasing Resources -- cleanup Internal Method

[source, scala]

cleanup(): Unit

Internally, cleanup marks ShuffleBlockFetcherIterator a <>.

cleanup <>.

cleanup iterates over <> internal queue and for every SuccessFetchResult, increments remote bytes read and blocks fetched shuffle task metrics, and eventually releases the managed buffer.

NOTE: cleanup is used when <>.

== [[releaseCurrentResultBuffer]] Decrementing Reference Count Of and Releasing Result Buffer (for SuccessFetchResult) -- releaseCurrentResultBuffer Internal Method

[source, scala]

releaseCurrentResultBuffer(): Unit

releaseCurrentResultBuffer decrements the <SuccessFetchResult reference>>'s buffer reference count if there is any.

releaseCurrentResultBuffer releases <>.

NOTE: releaseCurrentResultBuffer is used when <> and BufferReleasingInputStream closes.

== [[fetchLocalBlocks]] fetchLocalBlocks Internal Method

[source, scala]

fetchLocalBlocks(): Unit


NOTE: fetchLocalBlocks is used when...FIXME

== [[hasNext]] hasNext Method

[source, scala]

hasNext: Boolean

NOTE: hasNext is part of Scala's ++[Iterator Contract] to test whether this iterator can provide another element.

hasNext is positive (true) when <> is less than <>.

Otherwise, hasNext is negative (false).

== [[splitLocalRemoteBlocks]] splitLocalRemoteBlocks Internal Method

[source, scala]

splitLocalRemoteBlocks(): ArrayBuffer[FetchRequest]


NOTE: splitLocalRemoteBlocks is used exclusively when ShuffleBlockFetcherIterator is requested to <>.

== [[next]] Retrieving Next Element -- next Method

[source, scala]

next(): (BlockId, InputStream)

NOTE: next is part of Scala's ++[Iterator Contract] to produce the next element of this iterator.


Last update: 2020-11-21