Skip to content

= ShuffleBlockFetcherIterator

ShuffleBlockFetcherIterator is a Scala http://www.scala-lang.org/api/current/scala/collection/Iterator.html[Iterator] that fetches shuffle blocks (aka shuffle map outputs) from block managers.

ShuffleBlockFetcherIterator is <> exclusively when BlockStoreShuffleReader is requested to shuffle:BlockStoreShuffleReader.md#read[read combined key-value records for a reduce task].

ShuffleBlockFetcherIterator allows for <> as (BlockId, InputStream) pairs so a caller can handle shuffle blocks in a pipelined fashion as they are received.

ShuffleBlockFetcherIterator is exhausted (i.e. <>) when the <> is at least the <>.

ShuffleBlockFetcherIterator <> to avoid consuming too much memory.

[[internal-registries]] .ShuffleBlockFetcherIterator's Internal Registries and Counters [cols="1,2",options="header",width="100%"] |=== | Name | Description

| numBlocksProcessed | [[numBlocksProcessed]] The number of blocks <>.

| numBlocksToFetch a| [[numBlocksToFetch]] Total number of blocks to <>.

ShuffleBlockFetcherIterator can <> up to numBlocksToFetch elements.

numBlocksToFetch is increased every time ShuffleBlockFetcherIterator is requested to <> that prints it out as the INFO message to the logs:

Getting [numBlocksToFetch] non-empty blocks out of [totalBlocks] blocks

| [[results]] results | Internal FIFO blocking queue (using Java's https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/LinkedBlockingQueue.html[java.util.concurrent.LinkedBlockingQueue]) to hold FetchResult remote and local fetch results.

Used in:

  1. <> to take one FetchResult off the queue,

  2. <> to put SuccessFetchResult or FailureFetchResult remote fetch results (as part of BlockFetchingListener callback),

  3. <> (similarly to <>) to put local fetch results,

  4. <> to release managed buffers for SuccessFetchResult results.

| [[maxBytesInFlight]] maxBytesInFlight | The maximum size (in bytes) of all the remote shuffle blocks to fetch.

Set when <>.

| [[maxReqsInFlight]] maxReqsInFlight | The maximum number of remote requests to fetch shuffle blocks.

Set when <>.

| [[bytesInFlight]] bytesInFlight | The bytes of fetched remote shuffle blocks in flight

Starts at 0 when <>.

Incremented every <> and decremented every <>.

ShuffleBlockFetcherIterator makes sure that the invariant of bytesInFlight below <> holds every <>.

| [[reqsInFlight]] reqsInFlight | The number of remote shuffle block fetch requests in flight.

Starts at 0 when <>.

Incremented every <> and decremented every <>.

ShuffleBlockFetcherIterator makes sure that the invariant of reqsInFlight below <> holds every <>.

| [[isZombie]] isZombie | Flag whether ShuffleBlockFetcherIterator is still active. It is disabled, i.e. false, when <>.

<> (when the task using ShuffleBlockFetcherIterator finishes), the <> (registered in sendRequest) will no longer add fetched remote shuffle blocks into <> internal queue.

| [[currentResult]] currentResult | The currently-processed SuccessFetchResult

Set when ShuffleBlockFetcherIterator <(BlockId, InputStream) tuple>> and <> (on <>). |===

[TIP]

Enable ERROR, WARN, INFO, DEBUG or TRACE logging levels for org.apache.spark.storage.ShuffleBlockFetcherIterator logger to see what happens in ShuffleBlockFetcherIterator.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.storage.ShuffleBlockFetcherIterator=TRACE

Refer to spark-logging.md[Logging].

== [[fetchUpToMaxBytes]] fetchUpToMaxBytes Method

CAUTION: FIXME

== [[creating-instance]] Creating ShuffleBlockFetcherIterator Instance

When created, ShuffleBlockFetcherIterator takes the following:

  • [[context]] spark-TaskContext.md[TaskContext]
  • [[shuffleClient]] storage:ShuffleClient.md[]
  • [[blockManager]] storage:BlockManager.md[BlockManager]
  • [[blocksByAddress]] Blocks to fetch per storage:BlockManager.md[BlockManager] (as Seq[(BlockManagerId, Seq[(BlockId, Long)])])
  • [[streamWrapper]] Function to wrap the returned input stream (as (BlockId, InputStream) => InputStream)
  • <> -- the maximum size (in bytes) of map outputs to fetch simultaneously from each reduce task (controlled by shuffle:BlockStoreShuffleReader.md#spark_reducer_maxSizeInFlight[spark.reducer.maxSizeInFlight] Spark property)
  • <> -- the maximum number of remote requests to fetch blocks at any given point (controlled by shuffle:BlockStoreShuffleReader.md#spark_reducer_maxReqsInFlight[spark.reducer.maxReqsInFlight] Spark property)
  • [[maxBlocksInFlightPerAddress]] maxBlocksInFlightPerAddress
  • [[maxReqSizeShuffleToMem]] maxReqSizeShuffleToMem
  • [[detectCorrupt]] detectCorrupt flag to detect any corruption in fetched blocks (controlled by shuffle:BlockStoreShuffleReader.md#spark_shuffle_detectCorrupt[spark.shuffle.detectCorrupt] Spark property)

== [[initialize]] Initializing ShuffleBlockFetcherIterator -- initialize Internal Method

[source, scala]

initialize(): Unit

initialize registers a task cleanup and fetches shuffle blocks from remote and local storage:BlockManager.md[BlockManagers].

Internally, initialize spark-TaskContext.md#addTaskCompletionListener[registers a TaskCompletionListener] (that will <> right after the task finishes).

initialize <>.

initialize <fetchRequests internal registry)>>.

As ShuffleBlockFetcherIterator is in initialization phase, initialize makes sure that <> and <> internal counters are both 0. Otherwise, initialize throws an exception.

initialize <> (from remote storage:BlockManager.md[BlockManagers]).

You should see the following INFO message in the logs:

INFO ShuffleBlockFetcherIterator: Started [numFetches] remote fetches in [time] ms

initialize <>.

You should see the following DEBUG message in the logs:

DEBUG ShuffleBlockFetcherIterator: Got local blocks in  [time] ms

NOTE: initialize is used exclusively when ShuffleBlockFetcherIterator is <>.

== [[sendRequest]] Sending Remote Shuffle Block Fetch Request -- sendRequest Internal Method

[source, scala]

sendRequest(req: FetchRequest): Unit

Internally, when sendRequest runs, you should see the following DEBUG message in the logs:

DEBUG ShuffleBlockFetcherIterator: Sending request for [blocks.size] blocks ([size] B) from [hostPort]

sendRequest increments <> and <> internal counters.

NOTE: The input FetchRequest contains the remote storage:BlockManagerId.md[] address and the shuffle blocks to fetch (as a sequence of storage:BlockId.md[] and their sizes).

sendRequest storage:ShuffleClient.md#fetchBlocks[requests ShuffleClient to fetch shuffle blocks] (from the host, the port, and the executor as defined in the input FetchRequest).

NOTE: ShuffleClient was defined when <>.

sendRequest registers a BlockFetchingListener with ShuffleClient that:

  1. <> adds it as SuccessFetchResult to <> internal queue.

  2. <> adds it as FailureFetchResult to <> internal queue.

NOTE: sendRequest is used exclusively when ShuffleBlockFetcherIterator is requested to <>.

=== [[sendRequest-BlockFetchingListener-onBlockFetchSuccess]] onBlockFetchSuccess Callback

[source, scala]

onBlockFetchSuccess(blockId: String, buf: ManagedBuffer): Unit

Internally, onBlockFetchSuccess checks if the <> and does the further processing if it is not.

onBlockFetchSuccess marks the input blockId as received (i.e. removes it from all the blocks to fetch as requested in <>).

onBlockFetchSuccess adds the managed buf (as SuccessFetchResult) to <> internal queue.

You should see the following DEBUG message in the logs:

DEBUG ShuffleBlockFetcherIterator: remainingBlocks: [blocks]

Regardless of zombie state of ShuffleBlockFetcherIterator, you should see the following TRACE message in the logs:

TRACE ShuffleBlockFetcherIterator: Got remote block [blockId] after [time] ms

=== [[sendRequest-BlockFetchingListener-onBlockFetchFailure]] onBlockFetchFailure Callback

[source, scala]

onBlockFetchFailure(blockId: String, e: Throwable): Unit

When onBlockFetchFailure is called, you should see the following ERROR message in the logs:

ERROR ShuffleBlockFetcherIterator: Failed to get block(s) from [hostPort]

onBlockFetchFailure adds the block (as FailureFetchResult) to <> internal queue.

== [[throwFetchFailedException]] Throwing FetchFailedException (for ShuffleBlockId) -- throwFetchFailedException Internal Method

[source, scala]

throwFetchFailedException( blockId: BlockId, address: BlockManagerId, e: Throwable): Nothing


throwFetchFailedException throws a shuffle:FetchFailedException.md[FetchFailedException] when the input blockId is a ShuffleBlockId.

NOTE: throwFetchFailedException creates a FetchFailedException passing on the root cause of a failure, i.e. the input e.

Otherwise, throwFetchFailedException throws a SparkException:

Failed to get block [blockId], which is not a shuffle block

NOTE: throwFetchFailedException is used when ShuffleBlockFetcherIterator is requested for the <>.

== [[cleanup]] Releasing Resources -- cleanup Internal Method

[source, scala]

cleanup(): Unit

Internally, cleanup marks ShuffleBlockFetcherIterator a <>.

cleanup <>.

cleanup iterates over <> internal queue and for every SuccessFetchResult, increments remote bytes read and blocks fetched shuffle task metrics, and eventually releases the managed buffer.

NOTE: cleanup is used when <>.

== [[releaseCurrentResultBuffer]] Decrementing Reference Count Of and Releasing Result Buffer (for SuccessFetchResult) -- releaseCurrentResultBuffer Internal Method

[source, scala]

releaseCurrentResultBuffer(): Unit

releaseCurrentResultBuffer decrements the <SuccessFetchResult reference>>'s buffer reference count if there is any.

releaseCurrentResultBuffer releases <>.

NOTE: releaseCurrentResultBuffer is used when <> and BufferReleasingInputStream closes.

== [[fetchLocalBlocks]] fetchLocalBlocks Internal Method

[source, scala]

fetchLocalBlocks(): Unit

fetchLocalBlocks...FIXME

NOTE: fetchLocalBlocks is used when...FIXME

== [[hasNext]] hasNext Method

[source, scala]

hasNext: Boolean

NOTE: hasNext is part of Scala's ++https://www.scala-lang.org/api/current/scala/collection/Iterator.html#hasNext:Boolean++[Iterator Contract] to test whether this iterator can provide another element.

hasNext is positive (true) when <> is less than <>.

Otherwise, hasNext is negative (false).

== [[splitLocalRemoteBlocks]] splitLocalRemoteBlocks Internal Method

[source, scala]

splitLocalRemoteBlocks(): ArrayBuffer[FetchRequest]

splitLocalRemoteBlocks...FIXME

NOTE: splitLocalRemoteBlocks is used exclusively when ShuffleBlockFetcherIterator is requested to <>.

== [[next]] Retrieving Next Element -- next Method

[source, scala]

next(): (BlockId, InputStream)

NOTE: next is part of Scala's ++https://www.scala-lang.org/api/current/scala/collection/Iterator.html#next():A++[Iterator Contract] to produce the next element of this iterator.

next...FIXME


Last update: 2020-10-06