Skip to content

RDD — Description of Distributed Computation

[[T]] RDD is a description of a fault-tolerant and resilient computation over a possibly distributed collection of records (of type T).

Recursive Dependencies

toDebugString: String



doCheckpoint(): Unit


doCheckpoint is used when SparkContext is requested to run a job synchronously.

Implicit Methods


rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag](
  rdd: RDD[(K, V)]): OrderedRDDFunctions[K, V, (K, V)]

rddToOrderedRDDFunctions is an Scala implicit method that creates a OrderedRDDFunctions.

rddToOrderedRDDFunctions is used (implicitly) when:

== [[contract]] RDD Contract

=== [[compute]] Computing Partition (in TaskContext)

[source, scala]

compute( split: Partition, context: TaskContext): Iterator[T]

compute computes the input split[partition] in the TaskContext to produce a collection of values (of type T).

compute is implemented by any type of RDD in Spark and is called every time the records are requested unless RDD is[cached] or checkpointed (and the records can be read from an external storage, but this time closer to the compute node).

When an RDD is[cached], for specified[storage levels] (i.e. all but NONE)...FIXME

compute runs on the[driver].

compute is used when RDD is requested to <>.

=== [[getPartitions]] Partitions

[source, scala]

getPartitions: Array[Partition]

getPartitions is used when RDD is requested for the <> (called only once as the value is cached afterwards).

=== [[getDependencies]] Dependencies

[source, scala]

getDependencies: Seq[Dependency[_]]

getDependencies is used when RDD is requested for the <> (called only once as the value is cached afterwards).

=== [[getPreferredLocations]] Preferred Locations (Placement Preferences)

[source, scala]

getPreferredLocations( split: Partition): Seq[String] = Nil

getPreferredLocations is used when RDD is requested for the <> of a given partition.

=== [[partitioner]] Partitioner

[source, scala]

partitioner: Option[Partitioner] = None

RDD can have a[Partitioner] defined.

== [[extensions]][[implementations]] (Subset of) Available RDDs

[cols="30,70",options="header",width="100%"] |=== | RDD | Description

| CoGroupedRDD | [[CoGroupedRDD]]

| CoalescedRDD | [[CoalescedRDD]] Result of[repartition] or[coalesce] transformations

|[HadoopRDD] | [[HadoopRDD]] Allows for reading data stored in HDFS using the older MapReduce API. The most notable use case is the return RDD of SparkContext.textFile.

|[MapPartitionsRDD] | [[MapPartitionsRDD]] Result of calling map-like operations (e.g. map, flatMap, filter,[mapPartitions])

|[ParallelCollectionRDD] | [[ParallelCollectionRDD]]

|[ShuffledRDD] | [[ShuffledRDD]] Result of "shuffle" operators (e.g.[repartition] or[coalesce])


== [[creating-instance]] Creating Instance

RDD takes the following to be created:

  • [[_sc]][]
  • [[deps]] Parent RDDs, i.e. Dependencies (that have to be all computed successfully before this RDD)

RDD is an abstract class and cannot be created directly. It is created indirectly for the <>.

== [[storageLevel]][[getStorageLevel]] StorageLevel

RDD can have a[StorageLevel] specified. The default StorageLevel is[NONE].

storageLevel can be specified using <> method.

storageLevel becomes NONE again after <>.

The current StorageLevel is available using getStorageLevel method.

[source, scala]

getStorageLevel: StorageLevel

== [[id]] Unique Identifier

[source, scala]

id: Int

id is an unique identifier (aka RDD ID) in the given <<_sc, SparkContext>>.

id requests the <> for[newRddId] right when RDD is created.

== [[isBarrier_]][[isBarrier]] Barrier Stage

An RDD can be part of a[barrier stage]. By default, isBarrier flag is enabled (true) when:

. There are no ShuffleDependencies among the <>

. There is at least one parent RDD that has the flag enabled[ShuffledRDD] has the flag always disabled.[MapPartitionsRDD] is the only one RDD that can have the flag enabled.

== [[getOrCompute]] Getting Or Computing RDD Partition

[source, scala]

getOrCompute( partition: Partition, context: TaskContext): Iterator[T]

getOrCompute creates a[RDDBlockId] for the <> and the partition index.

getOrCompute requests the BlockManager to[getOrElseUpdate] for the block ID (with the <> and the makeIterator function).

NOTE: getOrCompute uses[SparkEnv] to access the current[BlockManager].

[[getOrCompute-readCachedBlock]] getOrCompute records whether...FIXME (readCachedBlock)

getOrCompute branches off per the response from the[BlockManager] and whether the internal readCachedBlock flag is now on or still off. In either case, getOrCompute creates an[InterruptibleIterator].

NOTE:[InterruptibleIterator] simply delegates to a wrapped internal Iterator, but allows for task killing functionality.

For a BlockResult available and readCachedBlock flag on, getOrCompute...FIXME

For a BlockResult available and readCachedBlock flag off, getOrCompute...FIXME

NOTE: The BlockResult could be found in a local block manager or fetched from a remote block manager. It may also have been stored (persisted) just now. In either case, the BlockResult is available (and[BlockManager.getOrElseUpdate] gives a Left value with the BlockResult).

For Right(iter) (regardless of the value of readCachedBlock flag since...FIXME), getOrCompute...FIXME

NOTE:[BlockManager.getOrElseUpdate] gives a Right(iter) value to indicate an error with a block.

NOTE: getOrCompute is used on Spark executors.

NOTE: getOrCompute is used exclusively when RDD is requested for the <>.

== [[dependencies]] RDD Dependencies

[source, scala]

dependencies: Seq[Dependency[_]]

dependencies returns the dependencies of a RDD.

NOTE: dependencies is a final method that no class in Spark can ever override.

Internally, dependencies checks out whether the RDD is checkpointed and acts accordingly.

For a RDD being checkpointed, dependencies returns a single-element collection with a OneToOneDependency.

For a non-checkpointed RDD, dependencies collection is computed using <getDependencies method>>.

NOTE: getDependencies method is an abstract method that custom RDDs are required to provide.

== [[iterator]] Accessing Records For Partition Lazily

[source, scala]

iterator( split: Partition, context: TaskContext): Iterator[T]

iterator <split partition>> when[cached] or <>.

== [[checkpointRDD]] Getting CheckpointRDD

[source, scala]

checkpoint Option[CheckpointRDD[T]]

checkpointRDD gives the CheckpointRDD from the <> internal registry if available (if the RDD was checkpointed).

checkpointRDD is used when RDD is requested for the <>, <> and <>.

== [[isCheckpointedAndMaterialized]] isCheckpointedAndMaterialized Method

[source, scala]

isCheckpointedAndMaterialized: Boolean


isCheckpointedAndMaterialized is used when RDD is requested to <>, <> and <>.

== [[getNarrowAncestors]] getNarrowAncestors Method

[source, scala]

getNarrowAncestors: Seq[RDD[_]]


getNarrowAncestors is used when StageInfo is requested to fromStage.

== [[toLocalIterator]] toLocalIterator Method

[source, scala]

toLocalIterator: Iterator[T]


== [[persist]] Persisting RDD

[source, scala]

persist(): this.type persist( newLevel: StorageLevel): this.type

Refer to[Persisting RDD].

== [[persist-internal]] persist Internal Method

[source, scala]

persist( newLevel: StorageLevel, allowOverride: Boolean): this.type


persist (private) is used when RDD is requested to <> and <>.

== [[unpersist]] unpersist Method

[source, scala]

unpersist(blocking: Boolean = true): this.type


== [[localCheckpoint]] localCheckpoint Method

[source, scala]

localCheckpoint(): this.type

localCheckpoint marks this RDD for local checkpointing using Spark's caching layer.

== [[computeOrReadCheckpoint]] Computing Partition or Reading From Checkpoint

[source, scala]

computeOrReadCheckpoint( split: Partition, context: TaskContext): Iterator[T]

computeOrReadCheckpoint reads split partition from a checkpoint (<>) or <> yourself.

computeOrReadCheckpoint is used when RDD is requested to <> or <>.

== [[getNumPartitions]] Getting Number of Partitions

[source, scala]

getNumPartitions: Int

getNumPartitions gives the number of partitions of a RDD.

[source, scala]

scala> sc.textFile("").getNumPartitions res0: Int = 2

scala> sc.textFile("", 5).getNumPartitions res1: Int = 5

== [[preferredLocations]] Defining Placement Preferences of RDD Partition

[source, scala]

preferredLocations( split: Partition): Seq[String]

preferredLocations requests the CheckpointRDD for <> (if the RDD is checkpointed) or <>.

preferredLocations is a template method that uses <> that custom RDDs can override to specify placement preferences for a partition. getPreferredLocations defines no placement preferences by default.

preferredLocations is mainly used when DAGScheduler is requested to[compute the preferred locations for missing partitions].

== [[partitions]] Accessing RDD Partitions

[source, scala]

partitions: Array[Partition]

partitions returns the[Partitions] of a RDD.

partitions requests CheckpointRDD for the <> (if the RDD is checkpointed) or <> and cache (in <> internal registry that is used next time).

Partitions have the property that their internal index should be equal to their position in the owning RDD.

== [[markCheckpointed]] markCheckpointed Method

[source, scala]

markCheckpointed(): Unit


markCheckpointed is used when...FIXME

== [[checkpoint]] Reliable Checkpointing -- checkpoint Method

[source, scala]

checkpoint(): Unit


checkpoint is used when...FIXME

== [[isReliablyCheckpointed]] isReliablyCheckpointed Method

[source, scala]

isReliablyCheckpointed: Boolean


isReliablyCheckpointed is used when...FIXME

== [[getCheckpointFile]] getCheckpointFile Method

[source, scala]

getCheckpointFile: Option[String]


getCheckpointFile is used when...FIXME

Last update: 2020-11-27