RDD — Description of Distributed Computation¶
RDD[T]
is an abstraction of fault-tolerant resilient distributed datasets that are mere descriptions of computations over a distributed collection of records (of type T
).
Contract¶
Computing Partition¶
compute(
split: Partition,
context: TaskContext): Iterator[T]
Computes the input Partition (with the TaskContext) to produce values (of type T
).
Used when:
RDD
is requested to computeOrReadCheckpoint
getPartitions¶
getPartitions: Array[Partition]
Used when:
RDD
is requested for the partitions
Implementations¶
- CheckpointRDD
- CoalescedRDD
- CoGroupedRDD
- HadoopRDD
- MapPartitionsRDD
- NewHadoopRDD
- ParallelCollectionRDD
- ReliableCheckpointRDD
- ShuffledRDD
- SubtractedRDD
- others
Creating Instance¶
RDD
takes the following to be created:
- SparkContext
- Dependencies (Parent RDDs that should be computed successfully before this RDD)
Abstract Class
RDD
is an abstract class and cannot be created directly. It is created indirectly for the concrete RDDs.
Stage-Level Scheduling¶
withResources¶
withResources(
rp: ResourceProfile): this.type
withResources
sets the given ResourceProfile as the resourceProfile and requests the ResourceProfileManager to add the resource profile.
resourceProfile¶
RDD
uses resourceProfile
internal registry for a ResourceProfile.
The ResourceProfile
is undefined when RDD
is created and is assigned in withResources.
The ResourceProfile
is available using getResourceProfile.
getResourceProfile¶
getResourceProfile(): ResourceProfile
getResourceProfile
returns the resourceProfile (if defined) or null
.
getResourceProfile
is used when:
DAGScheduler
is requested for the shuffle dependencies and resource profiles
Preferred Locations (Placement Preferences of Partition)¶
preferredLocations(
split: Partition): Seq[String]
Final Method
preferredLocations
is a Scala final method and may not be overridden in subclasses.
Learn more in the Scala Language Specification.
preferredLocations
requests the CheckpointRDD for the preferred locations for the given Partition if this RDD
is checkpointed orgetPreferredLocations.
preferredLocations
is a template method that uses getPreferredLocations that custom RDD
s can override to specify placement preferences on their own.
preferredLocations
is used when:
DAGScheduler
is requested for preferred locations
Partitions¶
partitions: Array[Partition]
Final Method
partitions
is a Scala final method and may not be overridden in subclasses.
Learn more in the Scala Language Specification.
partitions
requests the CheckpointRDD for the partitions if this RDD
is checkpointed.
Otherwise, when this RDD
is not checkpointed, partitions
getPartitions (and caches it in the partitions_).
Note
getPartitions
is an abstract method that custom RDD
s are required to provide.
partitions
has the property that their internal index should be equal to their position in this RDD
.
partitions
is used when:
DAGScheduler
is requested to getPreferredLocsInternalSparkContext
is requested to run a job- others
dependencies¶
dependencies: Seq[Dependency[_]]
Final Method
dependencies
is a Scala final method and may not be overridden in subclasses.
Learn more in the Scala Language Specification.
dependencies
branches off based on checkpointRDD (and availability of CheckpointRDD).
With CheckpointRDD available (this RDD
is checkpointed), dependencies
returns a OneToOneDependency with the CheckpointRDD
.
Otherwise, when this RDD
is not checkpointed, dependencies
getDependencies (and caches it in the dependencies_).
Note
getDependencies
is an abstract method that custom RDD
s are required to provide.
Reliable Checkpointing¶
checkpoint(): Unit
checkpoint
creates a new ReliableRDDCheckpointData (with this RDD
) and saves it in checkpointData registry.
checkpoint
does nothing when the checkpointData registry has already been defined.
checkpoint
throws a SparkException
when the checkpoint directory is not specified:
Checkpoint directory has not been set in the SparkContext
RDDCheckpointData¶
RDD
defines checkpointData
internal registry for a RDDCheckpointData[T] (of T
type of this RDD
).
The checkpointData
registry is undefined (None
) when RDD
is created and can be the following values:
Used when:
- isCheckpointedAndMaterialized
- isLocallyCheckpointed
- isReliablyCheckpointed
- getCheckpointFile
- doCheckpoint
CheckpointRDD¶
checkpointRDD: Option[CheckpointRDD[T]]
checkpointRDD
returns the CheckpointRDD of the RDDCheckpointData (if defined and so this RDD
checkpointed).
checkpointRDD
is used when:
RDD
is requested for the dependencies, partitions and preferred locations (all using final methods!)
doCheckpoint¶
doCheckpoint(): Unit
doCheckpoint
executes in checkpoint
scope.
doCheckpoint
turns the doCheckpointCalled flag on (to prevent multiple executions).
doCheckpoint
branches off based on whether a RDDCheckpointData is defined or not:
-
With the
RDDCheckpointData
defined,doCheckpoint
checks out the checkpointAllMarkedAncestors flag and if enabled,doCheckpoint
requests the Dependencies for the RDD that are in turn requested to doCheckpoint themselves. Otherwise,doCheckpoint
requests the RDDCheckpointData to checkpoint. -
With the RDDCheckpointData undefined,
doCheckpoint
requests the Dependencies for the RDD that are in turn requested to doCheckpoint themselves.
In other words, With the RDDCheckpointData
defined, requesting doCheckpointing of the Dependencies is guarded by checkpointAllMarkedAncestors flag.
doCheckpoint
skips execution if called earlier.
doCheckpoint
is used when:
SparkContext
is requested to run a job synchronously
iterator¶
iterator(
split: Partition,
context: TaskContext): Iterator[T]
iterator
...FIXME
Final Method
iterator
is a final
method and may not be overridden in subclasses. See 5.2.6 final in the Scala Language Specification.
getOrCompute¶
getOrCompute(
partition: Partition,
context: TaskContext): Iterator[T]
getOrCompute
...FIXME
computeOrReadCheckpoint¶
computeOrReadCheckpoint(
split: Partition,
context: TaskContext): Iterator[T]
computeOrReadCheckpoint
...FIXME
Debugging Recursive Dependencies¶
toDebugString: String
toDebugString
returns a RDD Lineage Graph.
val wordCount = sc.textFile("README.md")
.flatMap(_.split("\\s+"))
.map((_, 1))
.reduceByKey(_ + _)
scala> println(wordCount.toDebugString)
(2) ShuffledRDD[21] at reduceByKey at <console>:24 []
+-(2) MapPartitionsRDD[20] at map at <console>:24 []
| MapPartitionsRDD[19] at flatMap at <console>:24 []
| README.md MapPartitionsRDD[18] at textFile at <console>:24 []
| README.md HadoopRDD[17] at textFile at <console>:24 []
toDebugString
uses indentations to indicate a shuffle boundary.
The numbers in round brackets show the level of parallelism at each stage, e.g. (2)
in the above output.
scala> println(wordCount.getNumPartitions)
2
With spark.logLineage enabled, toDebugString
is printed out when executing an action.
$ ./bin/spark-shell --conf spark.logLineage=true
scala> sc.textFile("README.md", 4).count
...
15/10/17 14:46:42 INFO SparkContext: Starting job: count at <console>:25
15/10/17 14:46:42 INFO SparkContext: RDD's recursive dependencies:
(4) MapPartitionsRDD[1] at textFile at <console>:25 []
| README.md HadoopRDD[0] at textFile at <console>:25 []
Implicit Methods¶
rddToOrderedRDDFunctions¶
rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag](
rdd: RDD[(K, V)]): OrderedRDDFunctions[K, V, (K, V)]
rddToOrderedRDDFunctions
is an Scala implicit method that creates an OrderedRDDFunctions.
rddToOrderedRDDFunctions
is used (implicitly) when:
Review Me¶
== [[storageLevel]][[getStorageLevel]] StorageLevel
RDD can have a storage:StorageLevel.md[StorageLevel] specified. The default StorageLevel is storage:StorageLevel.md#NONE[NONE].
storageLevel can be specified using <
storageLevel becomes NONE again after <
The current StorageLevel is available using getStorageLevel
method.
[source, scala]¶
getStorageLevel: StorageLevel¶
== [[id]] Unique Identifier
[source, scala]¶
id: Int¶
id is an unique identifier (aka RDD ID) in the given <<_sc, SparkContext>>.
id requests the <
== [[isBarrier_]][[isBarrier]] Barrier Stage
An RDD can be part of a spark-barrier-execution-mode.md#barrier-stage[barrier stage]. By default, isBarrier
flag is enabled (true
) when:
. There are no ShuffleDependencies among the <
. There is at least one parent RDD that has the flag enabled
ShuffledRDD.md[ShuffledRDD] has the flag always disabled.
MapPartitionsRDD.md[MapPartitionsRDD] is the only one RDD that can have the flag enabled.
== [[getOrCompute]] Getting Or Computing RDD Partition
[source, scala]¶
getOrCompute( partition: Partition, context: TaskContext): Iterator[T]
getOrCompute
creates a storage:BlockId.md#RDDBlockId[RDDBlockId] for the <
getOrCompute
requests the BlockManager
to storage:BlockManager.md#getOrElseUpdate[getOrElseUpdate] for the block ID (with the <makeIterator
function).
NOTE: getOrCompute
uses core:SparkEnv.md#get[SparkEnv] to access the current core:SparkEnv.md#blockManager[BlockManager].
[[getOrCompute-readCachedBlock]] getOrCompute
records whether...FIXME (readCachedBlock)
getOrCompute
branches off per the response from the storage:BlockManager.md#getOrElseUpdate[BlockManager] and whether the internal readCachedBlock
flag is now on or still off. In either case, getOrCompute
creates an spark-InterruptibleIterator.md[InterruptibleIterator].
NOTE: spark-InterruptibleIterator.md[InterruptibleIterator] simply delegates to a wrapped internal Iterator
, but allows for task killing functionality.
For a BlockResult
available and readCachedBlock
flag on, getOrCompute
...FIXME
For a BlockResult
available and readCachedBlock
flag off, getOrCompute
...FIXME
NOTE: The BlockResult
could be found in a local block manager or fetched from a remote block manager. It may also have been stored (persisted) just now. In either case, the BlockResult
is available (and storage:BlockManager.md#getOrElseUpdate[BlockManager.getOrElseUpdate] gives a Left
value with the BlockResult
).
For Right(iter)
(regardless of the value of readCachedBlock
flag since...FIXME), getOrCompute
...FIXME
NOTE: storage:BlockManager.md#getOrElseUpdate[BlockManager.getOrElseUpdate] gives a Right(iter)
value to indicate an error with a block.
NOTE: getOrCompute
is used on Spark executors.
NOTE: getOrCompute
is used exclusively when RDD is requested for the <
== [[checkpointRDD]] Getting CheckpointRDD
[source, scala]¶
checkpoint Option[CheckpointRDD[T]]¶
checkpointRDD gives the CheckpointRDD from the <
checkpointRDD is used when RDD is requested for the <
== [[isCheckpointedAndMaterialized]] isCheckpointedAndMaterialized Method
[source, scala]¶
isCheckpointedAndMaterialized: Boolean¶
isCheckpointedAndMaterialized...FIXME
isCheckpointedAndMaterialized is used when RDD is requested to <
== [[getNarrowAncestors]] getNarrowAncestors Method
[source, scala]¶
getNarrowAncestors: Seq[RDD[_]]¶
getNarrowAncestors...FIXME
getNarrowAncestors is used when StageInfo is requested to fromStage.
== [[persist]] Persisting RDD
[source, scala]¶
persist(): this.type persist( newLevel: StorageLevel): this.type
Refer to spark-rdd-caching.md#persist[Persisting RDD].
== [[persist-internal]] persist Internal Method
[source, scala]¶
persist( newLevel: StorageLevel, allowOverride: Boolean): this.type
persist...FIXME
persist (private) is used when RDD is requested to <
== [[computeOrReadCheckpoint]] Computing Partition or Reading From Checkpoint
[source, scala]¶
computeOrReadCheckpoint( split: Partition, context: TaskContext): Iterator[T]
computeOrReadCheckpoint reads split
partition from a checkpoint (<
computeOrReadCheckpoint is used when RDD is requested to <