Skip to content

ShuffleBlockFetcherIterator

ShuffleBlockFetcherIterator is an Iterator[(BlockId, InputStream)] (Scala) that fetches shuffle blocks from local or remote BlockManagers (and makes them available as an InputStream).

ShuffleBlockFetcherIterator allows for a synchronous iteration over shuffle blocks so a caller can handle them in a pipelined fashion as they are received.

ShuffleBlockFetcherIterator is exhausted (and can provide no elements) when the number of blocks already processed is at least the total number of blocks to fetch.

ShuffleBlockFetcherIterator throttles the remote fetches to avoid consuming too much memory.

Creating Instance

ShuffleBlockFetcherIterator takes the following to be created:

While being created, ShuffleBlockFetcherIterator initializes itself.

ShuffleBlockFetcherIterator is created when:

Initializing

initialize(): Unit

initialize registers a task cleanup and fetches shuffle blocks from remote and local storage:BlockManager.md[BlockManagers].

Internally, initialize uses the TaskContext to register the ShuffleFetchCompletionListener (to cleanup).

initialize partitionBlocksByFetchMode.

initialize...FIXME

partitionBlocksByFetchMode

partitionBlocksByFetchMode(): ArrayBuffer[FetchRequest]

partitionBlocksByFetchMode...FIXME

collectFetchRequests

collectFetchRequests(
  address: BlockManagerId,
  blockInfos: Seq[(BlockId, Long, Int)],
  collectedRemoteRequests: ArrayBuffer[FetchRequest]): Unit

collectFetchRequests...FIXME

createFetchRequests

createFetchRequests(
  curBlocks: Seq[FetchBlockInfo],
  address: BlockManagerId,
  isLast: Boolean,
  collectedRemoteRequests: ArrayBuffer[FetchRequest]): Seq[FetchBlockInfo]

createFetchRequests...FIXME

fetchUpToMaxBytes

fetchUpToMaxBytes(): Unit

fetchUpToMaxBytes...FIXME

fetchUpToMaxBytes is used when:

Sending Remote Shuffle Block Fetch Request

sendRequest(
  req: FetchRequest): Unit

sendRequest prints out the following DEBUG message to the logs:

Sending request for [n] blocks ([size]) from [hostPort]

sendRequest add the size of the blocks in the FetchRequest to bytesInFlight and increments the reqsInFlight internal counters.

sendRequest requests the ShuffleClient to fetch the blocks with a new BlockFetchingListener (and this ShuffleBlockFetcherIterator when the size of the blocks in the FetchRequest is higher than the maxReqSizeShuffleToMem).

sendRequest is used when:

BlockFetchingListener

sendRequest creates a new BlockFetchingListener to be notified about successes or failures of shuffle block fetch requests.

onBlockFetchSuccess

On onBlockFetchSuccess the BlockFetchingListener adds a SuccessFetchResult to the results registry and prints out the following DEBUG message to the logs (when not a zombie):

remainingBlocks: [remainingBlocks]

In the end, onBlockFetchSuccess prints out the following TRACE message to the logs:

Got remote block [blockId] after [time]

onBlockFetchFailure

On onBlockFetchFailure the BlockFetchingListener adds a FailureFetchResult to the results registry and prints out the following ERROR message to the logs:

Failed to get block(s) from [host]:[port]

FetchResults

results: LinkedBlockingQueue[FetchResult]

ShuffleBlockFetcherIterator uses an internal FIFO blocking queue (Java) of FetchResults.

results is used for fetching the next element.

For remote blocks, FetchResults are added in sendRequest:

For local blocks, FetchResults are added in fetchLocalBlocks:

For local blocks, FetchResults are added in fetchHostLocalBlock:

FailureFetchResults can also be added in fetchHostLocalBlocks.

Cleaned up in cleanup

hasNext

hasNext: Boolean

hasNext is part of the Iterator (Scala) abstraction (to test whether this iterator can provide another element).

hasNext is true when numBlocksProcessed is below numBlocksToFetch.

Retrieving Next Element

next(): (BlockId, InputStream)

next increments the numBlocksProcessed registry.

next takes (and removes) the head of the results queue.

next requests the ShuffleReadMetricsReporter to incFetchWaitTime.

next...FIXME

next throws a NoSuchElementException if there is no element left.

next is part of the Iterator (Scala) abstraction (to produce the next element of this iterator).

numBlocksProcessed

The number of blocks fetched and consumed

numBlocksToFetch

Total number of blocks to fetch and consume

ShuffleBlockFetcherIterator can produce up to numBlocksToFetch elements.

numBlocksToFetch is increased every time ShuffleBlockFetcherIterator is requested to partitionBlocksByFetchMode that prints it out as the INFO message to the logs:

Getting [numBlocksToFetch] non-empty blocks out of [totalBlocks] blocks

releaseCurrentResultBuffer

releaseCurrentResultBuffer(): Unit

releaseCurrentResultBuffer...FIXME

releaseCurrentResultBuffer is used when:

  • ShuffleBlockFetcherIterator is requested to cleanup
  • BufferReleasingInputStream is requested to close

ShuffleFetchCompletionListener

ShuffleBlockFetcherIterator creates a ShuffleFetchCompletionListener when created.

ShuffleFetchCompletionListener is used when initialize and toCompletionIterator.

Cleaning Up

cleanup(): Unit

cleanup marks this ShuffleBlockFetcherIterator a zombie.

cleanup releases the current result buffer.

cleanup iterates over results internal queue and for every SuccessFetchResult, increments remote bytes read and blocks fetched shuffle task metrics, and eventually releases the managed buffer.

bytesInFlight

The bytes of fetched remote shuffle blocks in flight

Starts at 0 when ShuffleBlockFetcherIterator is created

Incremented every sendRequest and decremented every next.

ShuffleBlockFetcherIterator makes sure that the invariant of bytesInFlight is below maxBytesInFlight every remote shuffle block fetch.

reqsInFlight

The number of remote shuffle block fetch requests in flight.

Starts at 0 when ShuffleBlockFetcherIterator is created

Incremented every sendRequest and decremented every next.

ShuffleBlockFetcherIterator makes sure that the invariant of reqsInFlight is below maxReqsInFlight every remote shuffle block fetch.

isZombie

Controls whether ShuffleBlockFetcherIterator is still active and records SuccessFetchResults on successful shuffle block fetches.

Starts false when ShuffleBlockFetcherIterator is created

Enabled (true) in cleanup.

When enabled, registerTempFileToClean is a noop.

DownloadFileManager

ShuffleBlockFetcherIterator is a DownloadFileManager.

throwFetchFailedException

throwFetchFailedException(
  blockId: BlockId,
  mapIndex: Int,
  address: BlockManagerId,
  e: Throwable,
  message: Option[String] = None): Nothing

throwFetchFailedException takes the message (if defined) or uses the message of the given Throwable.

In the end, throwFetchFailedException throws a FetchFailedException if the BlockId is either a ShuffleBlockId or a ShuffleBlockBatchId. Otherwise, throwFetchFailedException throws a SparkException:

Failed to get block [blockId], which is not a shuffle block

throwFetchFailedException is used when:

  • ShuffleBlockFetcherIterator is requested to next
  • BufferReleasingInputStream is requested to tryOrFetchFailedException

Logging

Enable ALL logging level for org.apache.spark.storage.ShuffleBlockFetcherIterator logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.storage.ShuffleBlockFetcherIterator=ALL

Refer to Logging.