Skip to content

RDD — Description of Distributed Computation

RDD[T] is an abstraction of fault-tolerant resilient distributed datasets that are mere descriptions of computations over a distributed collection of records (of type T).

Contract

Computing Partition

compute(
  split: Partition,
  context: TaskContext): Iterator[T]

Computes the input Partition (with the TaskContext) to produce values (of type T).

Used when:

getPartitions

getPartitions: Array[Partition]

Used when:

Implementations

Creating Instance

RDD takes the following to be created:

Abstract Class

RDD is an abstract class and cannot be created directly. It is created indirectly for the concrete RDDs.

Barrier RDD

Barrier RDD is a RDD with the isBarrier flag enabled.

ShuffledRDD can never be a barrier RDD as it overrides isBarrier method to be always disabled (false).

isBarrier

isBarrier(): Boolean

isBarrier is the value of isBarrier_.


isBarrier is used when:

isBarrier_

isBarrier_ : Boolean // (1)!
  1. @transient protected lazy val

isBarrier_ is enabled (true) when there is at least one barrier RDD among the parent RDDs (excluding ShuffleDependencyies).

Note

isBarrier_ is overriden by PythonRDD and MapPartitionsRDD that both accept isFromBarrier flag.

ResourceProfile (Stage-Level Scheduling)

RDD can be assigned a ResourceProfile using RDD.withResources method.

val rdd: RDD[_] = ...
rdd
  .withResources(...) // request resources for a computation
  .mapPartitions(...) // the computation

RDD uses resourceProfile internal registry for the ResourceProfile that is undefined initially.

The ResourceProfile is available using RDD.getResourceProfile method.

withResources

withResources(
  rp: ResourceProfile): this.type

withResources sets the given ResourceProfile as the resourceProfile and requests the ResourceProfileManager to add the resource profile.

getResourceProfile

getResourceProfile(): ResourceProfile

getResourceProfile returns the resourceProfile (if defined) or null.


getResourceProfile is used when:

Preferred Locations (Placement Preferences of Partition)

preferredLocations(
  split: Partition): Seq[String]
Final Method

preferredLocations is a Scala final method and may not be overridden in subclasses.

Learn more in the Scala Language Specification.

preferredLocations requests the CheckpointRDD for the preferred locations for the given Partition if this RDD is checkpointed orgetPreferredLocations.


preferredLocations is a template method that uses getPreferredLocations that custom RDDs can override to specify placement preferences on their own.


preferredLocations is used when:

Partitions

partitions: Array[Partition]
Final Method

partitions is a Scala final method and may not be overridden in subclasses.

Learn more in the Scala Language Specification.

partitions requests the CheckpointRDD for the partitions if this RDD is checkpointed.

Otherwise, when this RDD is not checkpointed, partitions getPartitions (and caches it in the partitions_).

Note

getPartitions is an abstract method that custom RDDs are required to provide.


partitions has the property that their internal index should be equal to their position in this RDD.


partitions is used when:

dependencies

dependencies: Seq[Dependency[_]]
Final Method

dependencies is a Scala final method and may not be overridden in subclasses.

Learn more in the Scala Language Specification.

dependencies branches off based on checkpointRDD (and availability of CheckpointRDD).

With CheckpointRDD available (this RDD is checkpointed), dependencies returns a OneToOneDependency with the CheckpointRDD.

Otherwise, when this RDD is not checkpointed, dependencies getDependencies (and caches it in the dependencies_).

Note

getDependencies is an abstract method that custom RDDs are required to provide.

Reliable Checkpointing

checkpoint(): Unit

checkpoint creates a new ReliableRDDCheckpointData (with this RDD) and saves it in checkpointData registry.

checkpoint does nothing when the checkpointData registry has already been defined.

checkpoint throws a SparkException when the checkpoint directory is not specified:

Checkpoint directory has not been set in the SparkContext

RDDCheckpointData

RDD defines checkpointData internal registry for a RDDCheckpointData[T] (of T type of this RDD).

The checkpointData registry is undefined (None) when RDD is created and can be the following values:

Used when:

CheckpointRDD

checkpointRDD: Option[CheckpointRDD[T]]

checkpointRDD returns the CheckpointRDD of the RDDCheckpointData (if defined and so this RDD checkpointed).

checkpointRDD is used when:

doCheckpoint

doCheckpoint(): Unit

doCheckpoint executes in checkpoint scope.

doCheckpoint turns the doCheckpointCalled flag on (to prevent multiple executions).

doCheckpoint branches off based on whether a RDDCheckpointData is defined or not:

  1. With the RDDCheckpointData defined, doCheckpoint checks out the checkpointAllMarkedAncestors flag and if enabled, doCheckpoint requests the Dependencies for the RDD that are in turn requested to doCheckpoint themselves. Otherwise, doCheckpoint requests the RDDCheckpointData to checkpoint.

  2. With the RDDCheckpointData undefined, doCheckpoint requests the Dependencies for the RDD that are in turn requested to doCheckpoint themselves.

In other words, With the RDDCheckpointData defined, requesting doCheckpointing of the Dependencies is guarded by checkpointAllMarkedAncestors flag.

doCheckpoint skips execution if called earlier.


doCheckpoint is used when:

iterator

iterator(
  split: Partition,
  context: TaskContext): Iterator[T]

iterator...FIXME

Final Method

iterator is a final method and may not be overridden in subclasses. See 5.2.6 final in the Scala Language Specification.

getOrCompute

getOrCompute(
  partition: Partition,
  context: TaskContext): Iterator[T]

getOrCompute...FIXME

computeOrReadCheckpoint

computeOrReadCheckpoint(
  split: Partition,
  context: TaskContext): Iterator[T]

computeOrReadCheckpoint...FIXME

Debugging Recursive Dependencies

toDebugString: String

toDebugString returns a RDD Lineage Graph.

val wordCount = sc.textFile("README.md")
  .flatMap(_.split("\\s+"))
  .map((_, 1))
  .reduceByKey(_ + _)

scala> println(wordCount.toDebugString)
(2) ShuffledRDD[21] at reduceByKey at <console>:24 []
 +-(2) MapPartitionsRDD[20] at map at <console>:24 []
    |  MapPartitionsRDD[19] at flatMap at <console>:24 []
    |  README.md MapPartitionsRDD[18] at textFile at <console>:24 []
    |  README.md HadoopRDD[17] at textFile at <console>:24 []

toDebugString uses indentations to indicate a shuffle boundary.

The numbers in round brackets show the level of parallelism at each stage, e.g. (2) in the above output.

scala> println(wordCount.getNumPartitions)
2

With spark.logLineage enabled, toDebugString is printed out when executing an action.

$ ./bin/spark-shell --conf spark.logLineage=true

scala> sc.textFile("README.md", 4).count
...
15/10/17 14:46:42 INFO SparkContext: Starting job: count at <console>:25
15/10/17 14:46:42 INFO SparkContext: RDD's recursive dependencies:
(4) MapPartitionsRDD[1] at textFile at <console>:25 []
 |  README.md HadoopRDD[0] at textFile at <console>:25 []

coalesce

coalesce(
  numPartitions: Int,
  shuffle: Boolean = false,
  partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
  (implicit ord: Ordering[T] = null): RDD[T]

coalesce...FIXME


coalesce is used when:

Implicit Methods

rddToOrderedRDDFunctions

rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag](
  rdd: RDD[(K, V)]): OrderedRDDFunctions[K, V, (K, V)]

rddToOrderedRDDFunctions is an Scala implicit method that creates an OrderedRDDFunctions.

rddToOrderedRDDFunctions is used (implicitly) when: