Skip to content

RDD — Description of Distributed Computation

RDD[T] is an abstraction of fault-tolerant resilient distributed datasets that are mere descriptions of computations over a distributed collection of records (of type T).

Contract

Computing Partition

compute(
  split: Partition,
  context: TaskContext): Iterator[T]

Computes the input Partition (with the TaskContext) to produce values (of type T)

See:

Used when:

Partitions

getPartitions: Array[Partition]

Partitions of this RDD

See:

Used when:

Implementations

Creating Instance

RDD takes the following to be created:

Abstract Class

RDD is an abstract class and cannot be created directly. It is created indirectly for the concrete RDDs.

Barrier RDD

Barrier RDD is a RDD with the isBarrier flag enabled.

ShuffledRDD can never be a barrier RDD as it overrides isBarrier method to be always disabled (false).

isBarrier

isBarrier(): Boolean

isBarrier is the value of isBarrier_.


isBarrier is used when:

isBarrier_

isBarrier_ : Boolean // (1)!
  1. @transient protected lazy val

isBarrier_ is enabled (true) when there is at least one barrier RDD among the parent RDDs (excluding ShuffleDependencyies).

Note

isBarrier_ is overriden by PythonRDD and MapPartitionsRDD that both accept isFromBarrier flag.

ResourceProfile (Stage-Level Scheduling)

RDD can be assigned a ResourceProfile using RDD.withResources method.

val rdd: RDD[_] = ...
rdd
  .withResources(...) // request resources for a computation
  .mapPartitions(...) // the computation

RDD uses resourceProfile internal registry for the ResourceProfile that is undefined initially.

The ResourceProfile is available using RDD.getResourceProfile method.

withResources

withResources(
  rp: ResourceProfile): this.type

withResources sets the given ResourceProfile as the resourceProfile and requests the ResourceProfileManager to add the resource profile.

getResourceProfile

getResourceProfile(): ResourceProfile

getResourceProfile returns the resourceProfile (if defined) or null.


getResourceProfile is used when:

Preferred Locations (Placement Preferences of Partition)

preferredLocations(
  split: Partition): Seq[String]
Final Method

preferredLocations is a Scala final method and may not be overridden in subclasses.

Learn more in the Scala Language Specification.

preferredLocations requests the CheckpointRDD for the preferred locations for the given Partition if this RDD is checkpointed orgetPreferredLocations.


preferredLocations is a template method that uses getPreferredLocations that custom RDDs can override to specify placement preferences on their own.


preferredLocations is used when:

Partitions

partitions: Array[Partition]
Final Method

partitions is a Scala final method and may not be overridden in subclasses.

Learn more in the Scala Language Specification.

partitions requests the CheckpointRDD for the partitions if this RDD is checkpointed.

Otherwise, when this RDD is not checkpointed, partitions getPartitions (and caches it in the partitions_).

Note

getPartitions is an abstract method that custom RDDs are required to provide.


partitions has the property that their internal index should be equal to their position in this RDD.


partitions is used when:

dependencies

dependencies: Seq[Dependency[_]]
Final Method

dependencies is a Scala final method and may not be overridden in subclasses.

Learn more in the Scala Language Specification.

dependencies branches off based on checkpointRDD (and availability of CheckpointRDD).

With CheckpointRDD available (this RDD is checkpointed), dependencies returns a OneToOneDependency with the CheckpointRDD.

Otherwise, when this RDD is not checkpointed, dependencies getDependencies (and caches it in the dependencies_).

Note

getDependencies is an abstract method that custom RDDs are required to provide.

Reliable Checkpointing

checkpoint(): Unit

Public API

checkpoint is part of the public API.

Procedure

checkpoint is a procedure (returns Unit) so what happens inside stays inside (paraphrasing the former advertising slogan of Las Vegas, Nevada).

checkpoint creates a new ReliableRDDCheckpointData (with this RDD) and saves it in checkpointData registry.

checkpoint does nothing when the checkpointData registry has already been defined.

checkpoint throws a SparkException when the checkpoint directory is not specified:

Checkpoint directory has not been set in the SparkContext

RDDCheckpointData

checkpointData: Option[RDDCheckpointData[T]]

RDD defines checkpointData internal registry for a RDDCheckpointData[T] (of T type of this RDD).

The checkpointData registry is undefined (None) initially when this RDD is created and can hold a value after the following RDD API operators:

RDD Operator RDDCheckpointData
RDD.checkpoint ReliableRDDCheckpointData
RDD.localCheckpoint LocalRDDCheckpointData

checkpointData is used when:

CheckpointRDD

checkpointRDD: Option[CheckpointRDD[T]]

checkpointRDD returns the CheckpointRDD of the RDDCheckpointData (if defined and so this RDD checkpointed).


checkpointRDD is used when:

doCheckpoint

doCheckpoint(): Unit

RDD.doCheckpoint, SparkContext.runJob and Dataset.checkpoint

doCheckpoint is called every time a Spark job is submitted (using SparkContext.runJob).

I found it quite interesting at the very least.

doCheckpoint is triggered when Dataset.checkpoint operator (Spark SQL) is executed (with eager flag on) which will likely trigger one or more Spark jobs on the underlying RDD anyway.

Procedure

doCheckpoint is a procedure (returns Unit) so what happens inside stays inside (paraphrasing the former advertising slogan of Las Vegas, Nevada).

Does nothing unless checkpointData is defined

My understanding is that doCheckpoint does nothing (noop) unless the RDDCheckpointData is defined.

doCheckpoint executes all the following in checkpoint scope.

doCheckpoint turns the doCheckpointCalled flag on (to prevent multiple executions).

doCheckpoint branches off based on whether a RDDCheckpointData is defined or not:

  1. With the RDDCheckpointData defined, doCheckpoint checks out the checkpointAllMarkedAncestors flag and if enabled, doCheckpoint requests the Dependencies for the RDD that are in turn requested to doCheckpoint themselves. Otherwise, doCheckpoint requests the RDDCheckpointData to checkpoint.

  2. With the RDDCheckpointData undefined, doCheckpoint requests the Dependencies (of this RDD) for their RDDs that are in turn requested to doCheckpoint themselves (recursively).

Note

With the RDDCheckpointData defined, requesting doCheckpoint of the Dependencies is guarded by checkpointAllMarkedAncestors flag.

doCheckpoint skips execution if called earlier.

CheckpointRDD

CheckpointRDD is not checkpoint again (and does nothing when requested to do so).


doCheckpoint is used when:

iterator

iterator(
  split: Partition,
  context: TaskContext): Iterator[T]

iterator...FIXME

Final Method

iterator is a final method and may not be overridden in subclasses. See 5.2.6 final in the Scala Language Specification.

getOrCompute

getOrCompute(
  partition: Partition,
  context: TaskContext): Iterator[T]

getOrCompute...FIXME

computeOrReadCheckpoint

computeOrReadCheckpoint(
  split: Partition,
  context: TaskContext): Iterator[T]

computeOrReadCheckpoint...FIXME

Debugging Recursive Dependencies

toDebugString: String

toDebugString returns a RDD Lineage Graph.

val wordCount = sc.textFile("README.md")
  .flatMap(_.split("\\s+"))
  .map((_, 1))
  .reduceByKey(_ + _)

scala> println(wordCount.toDebugString)
(2) ShuffledRDD[21] at reduceByKey at <console>:24 []
 +-(2) MapPartitionsRDD[20] at map at <console>:24 []
    |  MapPartitionsRDD[19] at flatMap at <console>:24 []
    |  README.md MapPartitionsRDD[18] at textFile at <console>:24 []
    |  README.md HadoopRDD[17] at textFile at <console>:24 []

toDebugString uses indentations to indicate a shuffle boundary.

The numbers in round brackets show the level of parallelism at each stage, e.g. (2) in the above output.

scala> println(wordCount.getNumPartitions)
2

With spark.logLineage enabled, toDebugString is printed out when executing an action.

$ ./bin/spark-shell --conf spark.logLineage=true

scala> sc.textFile("README.md", 4).count
...
15/10/17 14:46:42 INFO SparkContext: Starting job: count at <console>:25
15/10/17 14:46:42 INFO SparkContext: RDD's recursive dependencies:
(4) MapPartitionsRDD[1] at textFile at <console>:25 []
 |  README.md HadoopRDD[0] at textFile at <console>:25 []

coalesce

coalesce(
  numPartitions: Int,
  shuffle: Boolean = false,
  partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
  (implicit ord: Ordering[T] = null): RDD[T]

coalesce...FIXME


coalesce is used when:

Implicit Methods

rddToOrderedRDDFunctions

rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag](
  rdd: RDD[(K, V)]): OrderedRDDFunctions[K, V, (K, V)]

rddToOrderedRDDFunctions is an Scala implicit method that creates an OrderedRDDFunctions.

rddToOrderedRDDFunctions is used (implicitly) when:

withScope

withScope[U](
  body: => U): U

withScope withScope with this SparkContext.

Note

withScope is used for most (if not all) RDD API operators.

mapPartitionsWithEvaluator

mapPartitionsWithEvaluator[U: ClassTag](
  evaluatorFactory: PartitionEvaluatorFactory[T, U]): RDD[U]

mapPartitionsWithEvaluator creates a MapPartitionsWithEvaluatorRDD for this RDD and the given PartitionEvaluatorFactory.

zipPartitionsWithEvaluator

zipPartitionsWithEvaluator[U: ClassTag](
  rdd2: RDD[T],
  evaluatorFactory: PartitionEvaluatorFactory[T, U]): RDD[U]

zipPartitionsWithEvaluator creates a ZippedPartitionsWithEvaluatorRDD for this RDD and the given RDD and the PartitionEvaluatorFactory.