Skip to content

RDD — Description of Distributed Computation

RDD[T] is an abstraction of fault-tolerant resilient distributed datasets that are mere descriptions of computations over a distributed collection of records (of type T).

Contract

Computing Partition

compute(
  split: Partition,
  context: TaskContext): Iterator[T]

Computes the input Partition (with the TaskContext) to produce values (of type T).

Used when:

getPartitions

getPartitions: Array[Partition]

Used when:

Implementations

Creating Instance

RDD takes the following to be created:

Abstract Class

RDD is an abstract class and cannot be created directly. It is created indirectly for the concrete RDDs.

Stage-Level Scheduling

withResources

withResources(
  rp: ResourceProfile): this.type

withResources sets the given ResourceProfile as the resourceProfile and requests the ResourceProfileManager to add the resource profile.

resourceProfile

RDD uses resourceProfile internal registry for a ResourceProfile.

The ResourceProfile is undefined when RDD is created and is assigned in withResources.

The ResourceProfile is available using getResourceProfile.

getResourceProfile

getResourceProfile(): ResourceProfile

getResourceProfile returns the resourceProfile (if defined) or null.

getResourceProfile is used when:

Preferred Locations (Placement Preferences of Partition)

preferredLocations(
  split: Partition): Seq[String]
Final Method

preferredLocations is a Scala final method and may not be overridden in subclasses.

Learn more in the Scala Language Specification.

preferredLocations requests the CheckpointRDD for the preferred locations for the given Partition if this RDD is checkpointed orgetPreferredLocations.


preferredLocations is a template method that uses getPreferredLocations that custom RDDs can override to specify placement preferences on their own.


preferredLocations is used when:

Partitions

partitions: Array[Partition]
Final Method

partitions is a Scala final method and may not be overridden in subclasses.

Learn more in the Scala Language Specification.

partitions requests the CheckpointRDD for the partitions if this RDD is checkpointed.

Otherwise, when this RDD is not checkpointed, partitions getPartitions (and caches it in the partitions_).

Note

getPartitions is an abstract method that custom RDDs are required to provide.


partitions has the property that their internal index should be equal to their position in this RDD.


partitions is used when:

dependencies

dependencies: Seq[Dependency[_]]
Final Method

dependencies is a Scala final method and may not be overridden in subclasses.

Learn more in the Scala Language Specification.

dependencies branches off based on checkpointRDD (and availability of CheckpointRDD).

With CheckpointRDD available (this RDD is checkpointed), dependencies returns a OneToOneDependency with the CheckpointRDD.

Otherwise, when this RDD is not checkpointed, dependencies getDependencies (and caches it in the dependencies_).

Note

getDependencies is an abstract method that custom RDDs are required to provide.

Reliable Checkpointing

checkpoint(): Unit

checkpoint creates a new ReliableRDDCheckpointData (with this RDD) and saves it in checkpointData registry.

checkpoint does nothing when the checkpointData registry has already been defined.

checkpoint throws a SparkException when the checkpoint directory is not specified:

Checkpoint directory has not been set in the SparkContext

RDDCheckpointData

RDD defines checkpointData internal registry for a RDDCheckpointData[T] (of T type of this RDD).

The checkpointData registry is undefined (None) when RDD is created and can be the following values:

Used when:

CheckpointRDD

checkpointRDD: Option[CheckpointRDD[T]]

checkpointRDD returns the CheckpointRDD of the RDDCheckpointData (if defined and so this RDD checkpointed).

checkpointRDD is used when:

doCheckpoint

doCheckpoint(): Unit

doCheckpoint executes in checkpoint scope.

doCheckpoint turns the doCheckpointCalled flag on (to prevent multiple executions).

doCheckpoint branches off based on whether a RDDCheckpointData is defined or not:

  1. With the RDDCheckpointData defined, doCheckpoint checks out the checkpointAllMarkedAncestors flag and if enabled, doCheckpoint requests the Dependencies for the RDD that are in turn requested to doCheckpoint themselves. Otherwise, doCheckpoint requests the RDDCheckpointData to checkpoint.

  2. With the RDDCheckpointData undefined, doCheckpoint requests the Dependencies for the RDD that are in turn requested to doCheckpoint themselves.

In other words, With the RDDCheckpointData defined, requesting doCheckpointing of the Dependencies is guarded by checkpointAllMarkedAncestors flag.

doCheckpoint skips execution if called earlier.

doCheckpoint is used when:

iterator

iterator(
  split: Partition,
  context: TaskContext): Iterator[T]

iterator...FIXME

Final Method

iterator is a final method and may not be overridden in subclasses. See 5.2.6 final in the Scala Language Specification.

getOrCompute

getOrCompute(
  partition: Partition,
  context: TaskContext): Iterator[T]

getOrCompute...FIXME

computeOrReadCheckpoint

computeOrReadCheckpoint(
  split: Partition,
  context: TaskContext): Iterator[T]

computeOrReadCheckpoint...FIXME

Debugging Recursive Dependencies

toDebugString: String

toDebugString returns a RDD Lineage Graph.

val wordCount = sc.textFile("README.md")
  .flatMap(_.split("\\s+"))
  .map((_, 1))
  .reduceByKey(_ + _)

scala> println(wordCount.toDebugString)
(2) ShuffledRDD[21] at reduceByKey at <console>:24 []
 +-(2) MapPartitionsRDD[20] at map at <console>:24 []
    |  MapPartitionsRDD[19] at flatMap at <console>:24 []
    |  README.md MapPartitionsRDD[18] at textFile at <console>:24 []
    |  README.md HadoopRDD[17] at textFile at <console>:24 []

toDebugString uses indentations to indicate a shuffle boundary.

The numbers in round brackets show the level of parallelism at each stage, e.g. (2) in the above output.

scala> println(wordCount.getNumPartitions)
2

With spark.logLineage enabled, toDebugString is printed out when executing an action.

$ ./bin/spark-shell --conf spark.logLineage=true

scala> sc.textFile("README.md", 4).count
...
15/10/17 14:46:42 INFO SparkContext: Starting job: count at <console>:25
15/10/17 14:46:42 INFO SparkContext: RDD's recursive dependencies:
(4) MapPartitionsRDD[1] at textFile at <console>:25 []
 |  README.md HadoopRDD[0] at textFile at <console>:25 []

Implicit Methods

rddToOrderedRDDFunctions

rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag](
  rdd: RDD[(K, V)]): OrderedRDDFunctions[K, V, (K, V)]

rddToOrderedRDDFunctions is an Scala implicit method that creates an OrderedRDDFunctions.

rddToOrderedRDDFunctions is used (implicitly) when:

Review Me

== [[storageLevel]][[getStorageLevel]] StorageLevel

RDD can have a storage:StorageLevel.md[StorageLevel] specified. The default StorageLevel is storage:StorageLevel.md#NONE[NONE].

storageLevel can be specified using <> method.

storageLevel becomes NONE again after <>.

The current StorageLevel is available using getStorageLevel method.

[source, scala]

getStorageLevel: StorageLevel

== [[id]] Unique Identifier

[source, scala]

id: Int

id is an unique identifier (aka RDD ID) in the given <<_sc, SparkContext>>.

id requests the <> for SparkContext.md#newRddId[newRddId] right when RDD is created.

== [[isBarrier_]][[isBarrier]] Barrier Stage

An RDD can be part of a spark-barrier-execution-mode.md#barrier-stage[barrier stage]. By default, isBarrier flag is enabled (true) when:

. There are no ShuffleDependencies among the <>

. There is at least one parent RDD that has the flag enabled

ShuffledRDD.md[ShuffledRDD] has the flag always disabled.

MapPartitionsRDD.md[MapPartitionsRDD] is the only one RDD that can have the flag enabled.

== [[getOrCompute]] Getting Or Computing RDD Partition

[source, scala]

getOrCompute( partition: Partition, context: TaskContext): Iterator[T]


getOrCompute creates a storage:BlockId.md#RDDBlockId[RDDBlockId] for the <> and the partition index.

getOrCompute requests the BlockManager to storage:BlockManager.md#getOrElseUpdate[getOrElseUpdate] for the block ID (with the <> and the makeIterator function).

NOTE: getOrCompute uses core:SparkEnv.md#get[SparkEnv] to access the current core:SparkEnv.md#blockManager[BlockManager].

[[getOrCompute-readCachedBlock]] getOrCompute records whether...FIXME (readCachedBlock)

getOrCompute branches off per the response from the storage:BlockManager.md#getOrElseUpdate[BlockManager] and whether the internal readCachedBlock flag is now on or still off. In either case, getOrCompute creates an spark-InterruptibleIterator.md[InterruptibleIterator].

NOTE: spark-InterruptibleIterator.md[InterruptibleIterator] simply delegates to a wrapped internal Iterator, but allows for task killing functionality.

For a BlockResult available and readCachedBlock flag on, getOrCompute...FIXME

For a BlockResult available and readCachedBlock flag off, getOrCompute...FIXME

NOTE: The BlockResult could be found in a local block manager or fetched from a remote block manager. It may also have been stored (persisted) just now. In either case, the BlockResult is available (and storage:BlockManager.md#getOrElseUpdate[BlockManager.getOrElseUpdate] gives a Left value with the BlockResult).

For Right(iter) (regardless of the value of readCachedBlock flag since...FIXME), getOrCompute...FIXME

NOTE: storage:BlockManager.md#getOrElseUpdate[BlockManager.getOrElseUpdate] gives a Right(iter) value to indicate an error with a block.

NOTE: getOrCompute is used on Spark executors.

NOTE: getOrCompute is used exclusively when RDD is requested for the <>.

== [[checkpointRDD]] Getting CheckpointRDD

[source, scala]

checkpoint Option[CheckpointRDD[T]]

checkpointRDD gives the CheckpointRDD from the <> internal registry if available (if the RDD was checkpointed).

checkpointRDD is used when RDD is requested for the <>, <> and <>.

== [[isCheckpointedAndMaterialized]] isCheckpointedAndMaterialized Method

[source, scala]

isCheckpointedAndMaterialized: Boolean

isCheckpointedAndMaterialized...FIXME

isCheckpointedAndMaterialized is used when RDD is requested to <>, <> and <>.

== [[getNarrowAncestors]] getNarrowAncestors Method

[source, scala]

getNarrowAncestors: Seq[RDD[_]]

getNarrowAncestors...FIXME

getNarrowAncestors is used when StageInfo is requested to fromStage.

== [[persist]] Persisting RDD

[source, scala]

persist(): this.type persist( newLevel: StorageLevel): this.type


Refer to spark-rdd-caching.md#persist[Persisting RDD].

== [[persist-internal]] persist Internal Method

[source, scala]

persist( newLevel: StorageLevel, allowOverride: Boolean): this.type


persist...FIXME

persist (private) is used when RDD is requested to <> and <>.

== [[computeOrReadCheckpoint]] Computing Partition or Reading From Checkpoint

[source, scala]

computeOrReadCheckpoint( split: Partition, context: TaskContext): Iterator[T]


computeOrReadCheckpoint reads split partition from a checkpoint (<>) or <> yourself.

computeOrReadCheckpoint is used when RDD is requested to <> or <>.