Skip to content

ShuffleMapTask

ShuffleMapTask is one of the two types of tasks that, when <>, writes the result of executing a <> over the records (of a <>) to the shuffle:ShuffleManager.md[shuffle system] and returns a MapStatus.md[MapStatus] (information about the storage:BlockManager.md[BlockManager] and estimated size of the result shuffle blocks).

ShuffleMapTask and DAGScheduler

Creating Instance

ShuffleMapTask takes the following to be created:

  • [[stageId]] Stage ID
  • [[stageAttemptId]] Stage attempt ID
  • <>
  • [[partition]] Partition
  • [[locs]] TaskLocations
  • [[localProperties]] Task-specific local properties
  • [[serializedTaskMetrics]] Serialized task metrics (Array[Byte])
  • [[jobId]] Optional job ID (default: None)
  • [[appId]] Optional application ID (default: None)
  • [[appAttemptId]] Optional application attempt ID (default: None)
  • [[isBarrier]] isBarrier flag (default: false)

ShuffleMapTask is created when DAGScheduler is requested to DAGScheduler.md#submitMissingTasks[submit tasks for all missing partitions of a ShuffleMapStage].

== [[taskBinary]] Broadcast Variable and Serialized Task Binary

ShuffleMapTask is given a ROOT:Broadcast.md[] with a reference to a serialized task binary (Broadcast[Array[Byte]]).

<> expects that the serialized task binary is a tuple of an ../rdd/RDD.md[RDD] and a ShuffleDependency.

== [[runTask]] Running Task

[source, scala]

runTask( context: TaskContext): MapStatus


runTask writes the result (records) of executing the <> over the records (in the <>) to the shuffle:ShuffleManager.md[shuffle system] and returns a MapStatus.md[MapStatus] (with the storage:BlockManager.md[BlockManager] and an estimated size of the result shuffle blocks).

Internally, runTask requests the core:SparkEnv.md[SparkEnv] for the new instance of core:SparkEnv.md#closureSerializer[closure serializer] and requests it to serializer:Serializer.md#deserialize[deserialize] the <> (into a tuple of a ../rdd/RDD.md[RDD] and a ShuffleDependency).

runTask measures the Task.md#_executorDeserializeTime[thread] and Task.md#_executorDeserializeCpuTime[CPU] deserialization times.

runTask requests the core:SparkEnv.md[SparkEnv] for the core:SparkEnv.md#shuffleManager[ShuffleManager] and requests it for a shuffle:ShuffleManager.md#getWriter[ShuffleWriter] (for the ShuffleHandle, the RDD partition, and the TaskContext).

runTask then requests the <> for the ../rdd/RDD.md#iterator[records] (of the <>) that the ShuffleWriter is requested to shuffle:ShuffleWriter.md#write[write out] (to the shuffle system).

In the end, runTask requests the ShuffleWriter to shuffle:ShuffleWriter.md#stop[stop] (with the success flag on) and returns the MapStatus.md[shuffle map output status].

NOTE: This is the moment in Task's lifecycle (and its corresponding RDD) when a ../rdd/index.md#iterator[RDD partition is computed] and in turn becomes a sequence of records (i.e. real data) on an executor.

In case of any exceptions, runTask requests the ShuffleWriter to shuffle:ShuffleWriter.md#stop[stop] (with the success flag off) and (re)throws the exception.

runTask may also print out the following DEBUG message to the logs when the ShuffleWriter could not be shuffle:ShuffleWriter.md#stop[stopped].

[source,plaintext]

Could not stop writer

runTask is part of Task.md#runTask[Task] abstraction.

== [[preferredLocations]] preferredLocations Method

[source, scala]

preferredLocations: Seq[TaskLocation]

preferredLocations simply returns the <> internal property.

preferredLocations is part of Task.md#preferredLocations[Task] abstraction.

== [[logging]] Logging

Enable ALL logging level for org.apache.spark.scheduler.ShuffleMapTask logger to see what happens inside.

Add the following line to conf/log4j.properties:

[source,plaintext]

log4j.logger.org.apache.spark.scheduler.ShuffleMapTask=ALL

Refer to ROOT:spark-logging.md[Logging].

== [[preferredLocs]] Preferred Locations

TaskLocation.md[TaskLocations] that are the unique entries in the given <> with the only rule that when locs is not defined, it is empty, and no task location preferences are defined.

Initialized when ShuffleMapTask is <>

Used exclusively when ShuffleMapTask is requested for the <>


Last update: 2020-10-10