Skip to content

RDD — Description of Distributed Computation

[[T]] RDD is a description of a fault-tolerant and resilient computation over a possibly distributed collection of records (of type T).

Recursive Dependencies

toDebugString: String

toDebugString...FIXME

doCheckpoint

doCheckpoint(): Unit

doCheckpoint...FIXME

doCheckpoint is used when SparkContext is requested to run a job synchronously.

== [[contract]] RDD Contract

=== [[compute]] Computing Partition (in TaskContext)

[source, scala]

compute( split: Partition, context: TaskContext): Iterator[T]


compute computes the input split spark-rdd-partitions.md[partition] in the scheduler:spark-TaskContext.md[TaskContext] to produce a collection of values (of type T).

compute is implemented by any type of RDD in Spark and is called every time the records are requested unless RDD is spark-rdd-caching.md[cached] or ROOT:rdd-checkpointing.md[checkpointed] (and the records can be read from an external storage, but this time closer to the compute node).

When an RDD is spark-rdd-caching.md[cached], for specified storage:StorageLevel.md[storage levels] (i.e. all but NONE)...FIXME

compute runs on the ROOT:spark-driver.md[driver].

compute is used when RDD is requested to <>.

=== [[getPartitions]] Partitions

[source, scala]

getPartitions: Array[Partition]

getPartitions is used when RDD is requested for the <> (called only once as the value is cached afterwards).

=== [[getDependencies]] Dependencies

[source, scala]

getDependencies: Seq[Dependency[_]]

getDependencies is used when RDD is requested for the <> (called only once as the value is cached afterwards).

=== [[getPreferredLocations]] Preferred Locations (Placement Preferences)

[source, scala]

getPreferredLocations( split: Partition): Seq[String] = Nil


getPreferredLocations is used when RDD is requested for the <> of a given spark-rdd-Partition.md[partition].

=== [[partitioner]] Partitioner

[source, scala]

partitioner: Option[Partitioner] = None

RDD can have a Partitioner.md[Partitioner] defined.

== [[extensions]][[implementations]] (Subset of) Available RDDs

[cols="30,70",options="header",width="100%"] |=== | RDD | Description

| CoGroupedRDD | [[CoGroupedRDD]]

| CoalescedRDD | [[CoalescedRDD]] Result of spark-rdd-partitions.md#repartition[repartition] or spark-rdd-partitions.md#coalesce[coalesce] transformations

| spark-rdd-HadoopRDD.md[HadoopRDD] | [[HadoopRDD]] Allows for reading data stored in HDFS using the older MapReduce API. The most notable use case is the return RDD of SparkContext.textFile.

| spark-rdd-MapPartitionsRDD.md[MapPartitionsRDD] | [[MapPartitionsRDD]] Result of calling map-like operations (e.g. map, flatMap, filter, spark-rdd-transformations.md#mapPartitions[mapPartitions])

| spark-rdd-ParallelCollectionRDD.md[ParallelCollectionRDD] | [[ParallelCollectionRDD]]

| ShuffledRDD.md[ShuffledRDD] | [[ShuffledRDD]] Result of "shuffle" operators (e.g. spark-rdd-partitions.md#repartition[repartition] or spark-rdd-partitions.md#coalesce[coalesce])

|===

== [[creating-instance]] Creating Instance

RDD takes the following to be created:

  • [[_sc]] ROOT:SparkContext.md[]
  • [[deps]] Parent RDDs, i.e. Dependencies (that have to be all computed successfully before this RDD)

RDD is an abstract class and cannot be created directly. It is created indirectly for the <>.

== [[storageLevel]][[getStorageLevel]] StorageLevel

RDD can have a storage:StorageLevel.md[StorageLevel] specified. The default StorageLevel is storage:StorageLevel.md#NONE[NONE].

storageLevel can be specified using <> method.

storageLevel becomes NONE again after <>.

The current StorageLevel is available using getStorageLevel method.

[source, scala]

getStorageLevel: StorageLevel

== [[id]] Unique Identifier

[source, scala]

id: Int

id is an unique identifier (aka RDD ID) in the given <<_sc, SparkContext>>.

id requests the <> for ROOT:SparkContext.md#newRddId[newRddId] right when RDD is created.

== [[isBarrier_]][[isBarrier]] Barrier Stage

An RDD can be part of a ROOT:spark-barrier-execution-mode.md#barrier-stage[barrier stage]. By default, isBarrier flag is enabled (true) when:

. There are no ShuffleDependencies among the <>

. There is at least one parent RDD that has the flag enabled

ShuffledRDD.md[ShuffledRDD] has the flag always disabled.

spark-rdd-MapPartitionsRDD.md[MapPartitionsRDD] is the only one RDD that can have the flag enabled.

== [[getOrCompute]] Getting Or Computing RDD Partition

[source, scala]

getOrCompute( partition: Partition, context: TaskContext): Iterator[T]


getOrCompute creates a storage:BlockId.md#RDDBlockId[RDDBlockId] for the <> and the spark-rdd-Partition.md#index[partition index].

getOrCompute requests the BlockManager to storage:BlockManager.md#getOrElseUpdate[getOrElseUpdate] for the block ID (with the <> and the makeIterator function).

NOTE: getOrCompute uses core:SparkEnv.md#get[SparkEnv] to access the current core:SparkEnv.md#blockManager[BlockManager].

[[getOrCompute-readCachedBlock]] getOrCompute records whether...FIXME (readCachedBlock)

getOrCompute branches off per the response from the storage:BlockManager.md#getOrElseUpdate[BlockManager] and whether the internal readCachedBlock flag is now on or still off. In either case, getOrCompute creates an spark-InterruptibleIterator.md[InterruptibleIterator].

NOTE: spark-InterruptibleIterator.md[InterruptibleIterator] simply delegates to a wrapped internal Iterator, but allows for spark-TaskContext.md#isInterrupted[task killing functionality].

For a BlockResult available and readCachedBlock flag on, getOrCompute...FIXME

For a BlockResult available and readCachedBlock flag off, getOrCompute...FIXME

NOTE: The BlockResult could be found in a local block manager or fetched from a remote block manager. It may also have been stored (persisted) just now. In either case, the BlockResult is available (and storage:BlockManager.md#getOrElseUpdate[BlockManager.getOrElseUpdate] gives a Left value with the BlockResult).

For Right(iter) (regardless of the value of readCachedBlock flag since...FIXME), getOrCompute...FIXME

NOTE: storage:BlockManager.md#getOrElseUpdate[BlockManager.getOrElseUpdate] gives a Right(iter) value to indicate an error with a block.

NOTE: getOrCompute is used on Spark executors.

NOTE: getOrCompute is used exclusively when RDD is requested for the <>.

== [[dependencies]] RDD Dependencies

[source, scala]

dependencies: Seq[Dependency[_]]

dependencies returns the dependencies of a RDD.

NOTE: dependencies is a final method that no class in Spark can ever override.

Internally, dependencies checks out whether the RDD is ROOT:rdd-checkpointing.md[checkpointed] and acts accordingly.

For a RDD being checkpointed, dependencies returns a single-element collection with a OneToOneDependency.

For a non-checkpointed RDD, dependencies collection is computed using <getDependencies method>>.

NOTE: getDependencies method is an abstract method that custom RDDs are required to provide.

== [[iterator]] Accessing Records For Partition Lazily

[source, scala]

iterator( split: Partition, context: TaskContext): Iterator[T]


iterator <split partition>> when spark-rdd-caching.md[cached] or <>.

== [[checkpointRDD]] Getting CheckpointRDD

[source, scala]

checkpoint Option[CheckpointRDD[T]]

checkpointRDD gives the CheckpointRDD from the <> internal registry if available (if the RDD was checkpointed).

checkpointRDD is used when RDD is requested for the <>, <> and <>.

== [[isCheckpointedAndMaterialized]] isCheckpointedAndMaterialized Method

[source, scala]

isCheckpointedAndMaterialized: Boolean

isCheckpointedAndMaterialized...FIXME

isCheckpointedAndMaterialized is used when RDD is requested to <>, <> and <>.

== [[getNarrowAncestors]] getNarrowAncestors Method

[source, scala]

getNarrowAncestors: Seq[RDD[_]]

getNarrowAncestors...FIXME

getNarrowAncestors is used when StageInfo is requested to scheduler:spark-scheduler-StageInfo.md#fromStage[fromStage].

== [[toLocalIterator]] toLocalIterator Method

[source, scala]

toLocalIterator: Iterator[T]

toLocalIterator...FIXME

== [[persist]] Persisting RDD

[source, scala]

persist(): this.type persist( newLevel: StorageLevel): this.type


Refer to spark-rdd-caching.md#persist[Persisting RDD].

== [[persist-internal]] persist Internal Method

[source, scala]

persist( newLevel: StorageLevel, allowOverride: Boolean): this.type


persist...FIXME

persist (private) is used when RDD is requested to <> and <>.

== [[unpersist]] unpersist Method

[source, scala]

unpersist(blocking: Boolean = true): this.type

unpersist...FIXME

== [[localCheckpoint]] localCheckpoint Method

[source, scala]

localCheckpoint(): this.type

localCheckpoint marks this RDD for ROOT:rdd-checkpointing.md[local checkpointing] using Spark's caching layer.

== [[computeOrReadCheckpoint]] Computing Partition or Reading From Checkpoint

[source, scala]

computeOrReadCheckpoint( split: Partition, context: TaskContext): Iterator[T]


computeOrReadCheckpoint reads split partition from a checkpoint (<>) or <> yourself.

computeOrReadCheckpoint is used when RDD is requested to <> or <>.

== [[getNumPartitions]] Getting Number of Partitions

[source, scala]

getNumPartitions: Int

getNumPartitions gives the number of partitions of a RDD.

[source, scala]

scala> sc.textFile("README.md").getNumPartitions res0: Int = 2

scala> sc.textFile("README.md", 5).getNumPartitions res1: Int = 5


== [[preferredLocations]] Defining Placement Preferences of RDD Partition

[source, scala]

preferredLocations( split: Partition): Seq[String]


preferredLocations requests the CheckpointRDD for <> (if the RDD is checkpointed) or <>.

preferredLocations is a template method that uses <> that custom RDDs can override to specify placement preferences for a partition. getPreferredLocations defines no placement preferences by default.

preferredLocations is mainly used when DAGScheduler is requested to scheduler:DAGScheduler.md#getPreferredLocs[compute the preferred locations for missing partitions].

== [[partitions]] Accessing RDD Partitions

[source, scala]

partitions: Array[Partition]

partitions returns the spark-rdd-partitions.md[Partitions] of a RDD.

partitions requests CheckpointRDD for the <> (if the RDD is checkpointed) or <> and cache (in <> internal registry that is used next time).

Partitions have the property that their internal index should be equal to their position in the owning RDD.

== [[markCheckpointed]] markCheckpointed Method

[source, scala]

markCheckpointed(): Unit

markCheckpointed...FIXME

markCheckpointed is used when...FIXME

== [[checkpoint]] Reliable Checkpointing -- checkpoint Method

[source, scala]

checkpoint(): Unit

checkpoint...FIXME

checkpoint is used when...FIXME

== [[isReliablyCheckpointed]] isReliablyCheckpointed Method

[source, scala]

isReliablyCheckpointed: Boolean

isReliablyCheckpointed...FIXME

isReliablyCheckpointed is used when...FIXME

== [[getCheckpointFile]] getCheckpointFile Method

[source, scala]

getCheckpointFile: Option[String]

getCheckpointFile...FIXME

getCheckpointFile is used when...FIXME


Last update: 2020-10-11