RDD — Description of Distributed Computation

RDD is a description of a fault-tolerant and resilient computation over a possibly distributed collection of records (of type T).

RDD Contract

Computing Partition (in TaskContext)

compute(
  split: Partition,
  context: TaskContext): Iterator[T]

compute computes the input split partition in the TaskContext to produce a collection of values (of type T).

compute is implemented by any type of RDD in Spark and is called every time the records are requested unless RDD is cached or checkpointed (and the records can be read from an external storage, but this time closer to the compute node).

When an RDD is cached, for specified storage levels (i.e. all but NONE)…​FIXME

compute runs on the driver.

compute is used when RDD is requested to computeOrReadCheckpoint.

Partitions

getPartitions: Array[Partition]

getPartitions is used when RDD is requested for the partitions (called only once as the value is cached afterwards).

Dependencies

getDependencies: Seq[Dependency[_]]

getDependencies is used when RDD is requested for the dependencies (called only once as the value is cached afterwards).

Preferred Locations (Placement Preferences)

getPreferredLocations(
  split: Partition): Seq[String] = Nil

getPreferredLocations is used when RDD is requested for the preferred locations of a given partition.

Partitioner

partitioner: Option[Partitioner] = None

RDD can have a Partitioner defined.

(Subset of) Available RDDs

RDD Description

CoGroupedRDD

CoalescedRDD

Result of repartition or coalesce transformations

HadoopRDD

Allows for reading data stored in HDFS using the older MapReduce API. The most notable use case is the return RDD of SparkContext.textFile.

MapPartitionsRDD

Result of calling map-like operations (e.g. map, flatMap, filter, mapPartitions)

ParallelCollectionRDD

ShuffledRDD

Result of "shuffle" operators (e.g. repartition or coalesce)

Creating Instance

RDD takes the following to be created:

RDD is an abstract class and cannot be created directly. It is created indirectly for the concrete RDDs.

StorageLevel

RDD can have a StorageLevel specified. The default StorageLevel is NONE.

storageLevel can be specified using persist method.

storageLevel becomes NONE again after unpersisting.

The current StorageLevel is available using getStorageLevel method.

getStorageLevel: StorageLevel

Unique Identifier

id: Int

id is an unique identifier (aka RDD ID) in the given SparkContext.

id requests the SparkContext for newRddId right when RDD is created.

Barrier Stage

An RDD can be part of a barrier stage. By default, isBarrier flag is enabled (true) when:

  1. There are no ShuffleDependencies among the RDD dependencies

  2. There is at least one parent RDD that has the flag enabled

ShuffledRDD has the flag always disabled.

MapPartitionsRDD is the only one RDD that can have the flag enabled.

Getting Or Computing RDD Partition

getOrCompute(
  partition: Partition,
  context: TaskContext): Iterator[T]

getOrCompute creates a RDDBlockId for the RDD id and the partition index.

getOrCompute requests the BlockManager to getOrElseUpdate for the block ID (with the storage level and the makeIterator function).

getOrCompute uses SparkEnv to access the current BlockManager.

getOrCompute records whether…​FIXME (readCachedBlock)

getOrCompute branches off per the response from the BlockManager and whether the internal readCachedBlock flag is now on or still off. In either case, getOrCompute creates an InterruptibleIterator.

InterruptibleIterator simply delegates to a wrapped internal Iterator, but allows for task killing functionality.

For a BlockResult available and readCachedBlock flag on, getOrCompute…​FIXME

For a BlockResult available and readCachedBlock flag off, getOrCompute…​FIXME

The BlockResult could be found in a local block manager or fetched from a remote block manager. It may also have been stored (persisted) just now. In either case, the BlockResult is available (and BlockManager.getOrElseUpdate gives a Left value with the BlockResult).

For Right(iter) (regardless of the value of readCachedBlock flag since…​FIXME), getOrCompute…​FIXME

BlockManager.getOrElseUpdate gives a Right(iter) value to indicate an error with a block.
getOrCompute is used on Spark executors.
getOrCompute is used exclusively when RDD is requested for the iterator over values in a partition.

RDD Dependencies

dependencies: Seq[Dependency[_]]

dependencies returns the dependencies of a RDD.

dependencies is a final method that no class in Spark can ever override.

Internally, dependencies checks out whether the RDD is checkpointed and acts accordingly.

For a RDD being checkpointed, dependencies returns a single-element collection with a OneToOneDependency.

For a non-checkpointed RDD, dependencies collection is computed using getDependencies method.

getDependencies method is an abstract method that custom RDDs are required to provide.

Accessing Records For Partition Lazily

iterator(
  split: Partition,
  context: TaskContext): Iterator[T]

Getting CheckpointRDD

checkpointRDD: Option[CheckpointRDD[T]]

checkpointRDD gives the CheckpointRDD from the checkpointData internal registry if available (if the RDD was checkpointed).

checkpointRDD is used when RDD is requested for the dependencies, partitions and preferredLocations.

isCheckpointedAndMaterialized Method

isCheckpointedAndMaterialized: Boolean

isCheckpointedAndMaterialized…​FIXME

isCheckpointedAndMaterialized is used when RDD is requested to computeOrReadCheckpoint, localCheckpoint and isCheckpointed.

getNarrowAncestors Method

getNarrowAncestors: Seq[RDD[_]]

getNarrowAncestors…​FIXME

getNarrowAncestors is used when StageInfo is requested to fromStage.

toLocalIterator Method

toLocalIterator: Iterator[T]

toLocalIterator…​FIXME

Persisting RDD

persist(): this.type
persist(
  newLevel: StorageLevel): this.type

Refer to Persisting RDD.

persist Internal Method

persist(
  newLevel: StorageLevel,
  allowOverride: Boolean): this.type

persist…​FIXME

persist (private) is used when RDD is requested to persist and localCheckpoint.

unpersist Method

unpersist(blocking: Boolean = true): this.type

unpersist…​FIXME

localCheckpoint Method

localCheckpoint(): this.type

localCheckpoint marks this RDD for local checkpointing using Spark’s caching layer.

Computing Partition or Reading From Checkpoint

computeOrReadCheckpoint(
  split: Partition,
  context: TaskContext): Iterator[T]

computeOrReadCheckpoint reads split partition from a checkpoint (if available already) or computes it yourself.

computeOrReadCheckpoint is used when RDD is requested to compute records for a partition or getOrCompute.

Getting Number of Partitions

getNumPartitions: Int

getNumPartitions gives the number of partitions of a RDD.

scala> sc.textFile("README.md").getNumPartitions
res0: Int = 2

scala> sc.textFile("README.md", 5).getNumPartitions
res1: Int = 5

Defining Placement Preferences of RDD Partition

preferredLocations(
  split: Partition): Seq[String]

preferredLocations requests the CheckpointRDD for placement preferences (if the RDD is checkpointed) or calculates them itself.

preferredLocations is a template method that uses getPreferredLocations that custom RDDs can override to specify placement preferences for a partition. getPreferredLocations defines no placement preferences by default.

preferredLocations is mainly used when DAGScheduler is requested to compute the preferred locations for missing partitions.

Accessing RDD Partitions

partitions: Array[Partition]

partitions returns the Partitions of a RDD.

partitions requests CheckpointRDD for the partitions (if the RDD is checkpointed) or finds them itself and cache (in partitions_ internal registry that is used next time).

Partitions have the property that their internal index should be equal to their position in the owning RDD.

markCheckpointed Method

markCheckpointed(): Unit

markCheckpointed…​FIXME

markCheckpointed is used when…​FIXME

doCheckpoint Method

doCheckpoint(): Unit

doCheckpoint…​FIXME

doCheckpoint is used when SparkContext is requested to run a job (synchronously).

Reliable Checkpointing — checkpoint Method

checkpoint(): Unit

checkpoint…​FIXME

checkpoint is used when…​FIXME

isReliablyCheckpointed Method

isReliablyCheckpointed: Boolean

isReliablyCheckpointed…​FIXME

isReliablyCheckpointed is used when…​FIXME

getCheckpointFile Method

getCheckpointFile: Option[String]

getCheckpointFile…​FIXME

getCheckpointFile is used when…​FIXME