ShuffleMapTask

ShuffleMapTask is one of the two types of tasks that, when executed, writes the result of executing a serialized task code over the records (of a RDD partition) to the shuffle system and returns a MapStatus (information about the BlockManager and estimated size of the result shuffle blocks).

ShuffleMapTask
Figure 1. ShuffleMapTask and DAGScheduler

Creating Instance

ShuffleMapTask takes the following to be created:

ShuffleMapTask is created when DAGScheduler is requested to submit tasks for all missing partitions of a ShuffleMapStage.

Broadcast Variable and Serialized Task Binary

ShuffleMapTask is given a broadcast variable with a reference to a serialized task binary (Broadcast[Array[Byte]]).

runTask expects that the serialized task binary is a tuple of an RDD and a ShuffleDependency.

Running Task

runTask(
  context: TaskContext): MapStatus

runTask writes the result (records) of executing the serialized task code over the records (in the RDD partition) to the shuffle system and returns a MapStatus (with the BlockManager and an estimated size of the result shuffle blocks).

Internally, runTask requests the SparkEnv for the new instance of closure serializer and requests it to deserialize the taskBinary (into a tuple of a RDD and a ShuffleDependency).

runTask measures the thread and CPU deserialization times.

runTask requests the SparkEnv for the ShuffleManager and requests it for a ShuffleWriter (for the ShuffleHandle, the RDD partition, and the TaskContext).

runTask then requests the RDD for the records (of the partition) that the ShuffleWriter is requested to write out (to the shuffle system).

In the end, runTask requests the ShuffleWriter to stop (with the success flag on) and returns the shuffle map output status.

This is the moment in Task's lifecycle (and its corresponding RDD) when a RDD partition is computed and in turn becomes a sequence of records (i.e. real data) on an executor.

In case of any exceptions, runTask requests the ShuffleWriter to stop (with the success flag off) and (re)throws the exception.

runTask may also print out the following DEBUG message to the logs when the ShuffleWriter could not be stopped.

Could not stop writer

runTask is part of Task abstraction.

preferredLocations Method

preferredLocations: Seq[TaskLocation]

preferredLocations simply returns the preferredLocs internal property.

preferredLocations is part of Task abstraction.

Logging

Enable ALL logging level for org.apache.spark.scheduler.ShuffleMapTask logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.scheduler.ShuffleMapTask=ALL

Refer to Logging.

Preferred Locations

TaskLocations that are the unique entries in the given locs with the only rule that when locs is not defined, it is empty, and no task location preferences are defined.

Initialized when ShuffleMapTask is created

Used exclusively when ShuffleMapTask is requested for the preferred locations