Skip to content


ShuffleBlockFetcherIterator is an Iterator[(BlockId, InputStream)] (Scala) that fetches shuffle blocks from local or remote BlockManagers (and makes them available as an InputStream).

ShuffleBlockFetcherIterator allows for a synchronous iteration over shuffle blocks so a caller can handle them in a pipelined fashion as they are received.

ShuffleBlockFetcherIterator is exhausted (and can provide no elements) when the number of blocks already processed is at least the total number of blocks to fetch.

ShuffleBlockFetcherIterator throttles the remote fetches to avoid consuming too much memory.

Creating Instance

ShuffleBlockFetcherIterator takes the following to be created:

While being created, ShuffleBlockFetcherIterator initializes itself.

ShuffleBlockFetcherIterator is created when:


initialize(): Unit

initialize registers a task cleanup and fetches shuffle blocks from remote and local[BlockManagers].

Internally, initialize uses the TaskContext to register the ShuffleFetchCompletionListener (to cleanup).

initialize partitionBlocksByFetchMode.



partitionBlocksByFetchMode(): ArrayBuffer[FetchRequest]



  address: BlockManagerId,
  blockInfos: Seq[(BlockId, Long, Int)],
  collectedRemoteRequests: ArrayBuffer[FetchRequest]): Unit



  curBlocks: Seq[FetchBlockInfo],
  address: BlockManagerId,
  isLast: Boolean,
  collectedRemoteRequests: ArrayBuffer[FetchRequest]): Seq[FetchBlockInfo]



fetchUpToMaxBytes(): Unit


fetchUpToMaxBytes is used when:

Sending Remote Shuffle Block Fetch Request

  req: FetchRequest): Unit

sendRequest prints out the following DEBUG message to the logs:

Sending request for [n] blocks ([size]) from [hostPort]

sendRequest add the size of the blocks in the FetchRequest to bytesInFlight and increments the reqsInFlight internal counters.

sendRequest requests the ShuffleClient to fetch the blocks with a new BlockFetchingListener (and this ShuffleBlockFetcherIterator when the size of the blocks in the FetchRequest is higher than the maxReqSizeShuffleToMem).

sendRequest is used when:


sendRequest creates a new BlockFetchingListener to be notified about successes or failures of shuffle block fetch requests.


On onBlockFetchSuccess the BlockFetchingListener adds a SuccessFetchResult to the results registry and prints out the following DEBUG message to the logs (when not a zombie):

remainingBlocks: [remainingBlocks]

In the end, onBlockFetchSuccess prints out the following TRACE message to the logs:

Got remote block [blockId] after [time]


On onBlockFetchFailure the BlockFetchingListener adds a FailureFetchResult to the results registry and prints out the following ERROR message to the logs:

Failed to get block(s) from [host]:[port]


results: LinkedBlockingQueue[FetchResult]

ShuffleBlockFetcherIterator uses an internal FIFO blocking queue (Java) of FetchResults.

results is used for fetching the next element.

For remote blocks, FetchResults are added in sendRequest:

For local blocks, FetchResults are added in fetchLocalBlocks:

For local blocks, FetchResults are added in fetchHostLocalBlock:

FailureFetchResults can also be added in fetchHostLocalBlocks.

Cleaned up in cleanup


hasNext: Boolean

hasNext is part of the Iterator (Scala) abstraction (to test whether this iterator can provide another element).

hasNext is true when numBlocksProcessed is below numBlocksToFetch.

Retrieving Next Element

next(): (BlockId, InputStream)

next increments the numBlocksProcessed registry.

next takes (and removes) the head of the results queue.

next requests the ShuffleReadMetricsReporter to incFetchWaitTime.


next throws a NoSuchElementException if there is no element left.

next is part of the Iterator (Scala) abstraction (to produce the next element of this iterator).


The number of blocks fetched and consumed


Total number of blocks to fetch and consume

ShuffleBlockFetcherIterator can produce up to numBlocksToFetch elements.

numBlocksToFetch is increased every time ShuffleBlockFetcherIterator is requested to partitionBlocksByFetchMode that prints it out as the INFO message to the logs:

Getting [numBlocksToFetch] non-empty blocks out of [totalBlocks] blocks


releaseCurrentResultBuffer(): Unit


releaseCurrentResultBuffer is used when:

  • ShuffleBlockFetcherIterator is requested to cleanup
  • BufferReleasingInputStream is requested to close


ShuffleBlockFetcherIterator creates a ShuffleFetchCompletionListener when created.

ShuffleFetchCompletionListener is used when initialize and toCompletionIterator.

Cleaning Up

cleanup(): Unit

cleanup marks this ShuffleBlockFetcherIterator a zombie.

cleanup releases the current result buffer.

cleanup iterates over results internal queue and for every SuccessFetchResult, increments remote bytes read and blocks fetched shuffle task metrics, and eventually releases the managed buffer.


The bytes of fetched remote shuffle blocks in flight

Starts at 0 when ShuffleBlockFetcherIterator is created

Incremented every sendRequest and decremented every next.

ShuffleBlockFetcherIterator makes sure that the invariant of bytesInFlight is below maxBytesInFlight every remote shuffle block fetch.


The number of remote shuffle block fetch requests in flight.

Starts at 0 when ShuffleBlockFetcherIterator is created

Incremented every sendRequest and decremented every next.

ShuffleBlockFetcherIterator makes sure that the invariant of reqsInFlight is below maxReqsInFlight every remote shuffle block fetch.


Controls whether ShuffleBlockFetcherIterator is still active and records SuccessFetchResults on successful shuffle block fetches.

Starts false when ShuffleBlockFetcherIterator is created

Enabled (true) in cleanup.

When enabled, registerTempFileToClean is a noop.


ShuffleBlockFetcherIterator is a DownloadFileManager.


  blockId: BlockId,
  mapIndex: Int,
  address: BlockManagerId,
  e: Throwable,
  message: Option[String] = None): Nothing

throwFetchFailedException takes the message (if defined) or uses the message of the given Throwable.

In the end, throwFetchFailedException throws a FetchFailedException if the BlockId is either a ShuffleBlockId or a ShuffleBlockBatchId. Otherwise, throwFetchFailedException throws a SparkException:

Failed to get block [blockId], which is not a shuffle block

throwFetchFailedException is used when:

  • ShuffleBlockFetcherIterator is requested to next
  • BufferReleasingInputStream is requested to tryOrFetchFailedException


Enable ALL logging level for logger to see what happens inside.

Add the following line to conf/

Refer to Logging.