RDD — Description of Distributed Computation¶
RDD[T] is an abstraction of fault-tolerant resilient distributed datasets that are mere descriptions of computations over a distributed collection of records (of type T).
Contract¶
Computing Partition¶
compute(
split: Partition,
context: TaskContext): Iterator[T]
Computes the input Partition (with the TaskContext) to produce values (of type T)
See:
Used when:
RDDis requested to computeOrReadCheckpoint
Partitions¶
getPartitions: Array[Partition]
Partitions of this RDD
See:
Used when:
RDDis requested for the partitions
Implementations¶
- CheckpointRDD
- CoalescedRDD
- CoGroupedRDD
- HadoopRDD
- MapPartitionsRDD
- NewHadoopRDD
- ParallelCollectionRDD
- ReliableCheckpointRDD
- ShuffledRDD
- others
Creating Instance¶
RDD takes the following to be created:
- SparkContext
- Dependencies (Parent RDDs that should be computed successfully before this RDD)
Abstract Class
RDD is an abstract class and cannot be created directly. It is created indirectly for the concrete RDDs.
Barrier RDD¶
Barrier RDD is a RDD with the isBarrier flag enabled.
ShuffledRDD can never be a barrier RDD as it overrides isBarrier method to be always disabled (false).
isBarrier¶
isBarrier(): Boolean
isBarrier is the value of isBarrier_.
isBarrier is used when:
DAGScheduleris requested to submitMissingTasks (that are either ShuffleMapStages to create ShuffleMapTasks or ResultStage to create ResultTasks)RDDInfois createdShuffleDependencyis requested to canShuffleMergeBeEnabledDAGScheduleris requested to checkBarrierStageWithRDDChainPattern, checkBarrierStageWithDynamicAllocation, checkBarrierStageWithNumSlots, handleTaskCompletion (FetchFailedcase to mark a map stage as broken)
isBarrier_¶
isBarrier_ : Boolean // (1)!
@transient protected lazy val
isBarrier_ is enabled (true) when there is at least one barrier RDD among the parent RDDs (excluding ShuffleDependencyies).
Note
isBarrier_ is overriden by PythonRDD and MapPartitionsRDD that both accept isFromBarrier flag.
ResourceProfile (Stage-Level Scheduling)¶
RDD can be assigned a ResourceProfile using RDD.withResources method.
val rdd: RDD[_] = ...
rdd
.withResources(...) // request resources for a computation
.mapPartitions(...) // the computation
RDD uses resourceProfile internal registry for the ResourceProfile that is undefined initially.
The ResourceProfile is available using RDD.getResourceProfile method.
withResources¶
withResources(
rp: ResourceProfile): this.type
withResources sets the given ResourceProfile as the resourceProfile and requests the ResourceProfileManager to add the resource profile.
getResourceProfile¶
getResourceProfile(): ResourceProfile
getResourceProfile returns the resourceProfile (if defined) or null.
getResourceProfile is used when:
DAGScheduleris requested for the ShuffleDependencies and ResourceProfiles of an RDD
Preferred Locations (Placement Preferences of Partition)¶
preferredLocations(
split: Partition): Seq[String]
Final Method
preferredLocations is a Scala final method and may not be overridden in subclasses.
Learn more in the Scala Language Specification.
preferredLocations requests the CheckpointRDD for the preferred locations for the given Partition if this RDD is checkpointed orgetPreferredLocations.
preferredLocations is a template method that uses getPreferredLocations that custom RDDs can override to specify placement preferences on their own.
preferredLocations is used when:
DAGScheduleris requested for preferred locations
Partitions¶
partitions: Array[Partition]
Final Method
partitions is a Scala final method and may not be overridden in subclasses.
Learn more in the Scala Language Specification.
partitions requests the CheckpointRDD for the partitions if this RDD is checkpointed.
Otherwise, when this RDD is not checkpointed, partitions getPartitions (and caches it in the partitions_).
Note
getPartitions is an abstract method that custom RDDs are required to provide.
partitions has the property that their internal index should be equal to their position in this RDD.
partitions is used when:
DAGScheduleris requested to getPreferredLocsInternalSparkContextis requested to run a job- others
dependencies¶
dependencies: Seq[Dependency[_]]
Final Method
dependencies is a Scala final method and may not be overridden in subclasses.
Learn more in the Scala Language Specification.
dependencies branches off based on checkpointRDD (and availability of CheckpointRDD).
With CheckpointRDD available (this RDD is checkpointed), dependencies returns a OneToOneDependency with the CheckpointRDD.
Otherwise, when this RDD is not checkpointed, dependencies getDependencies (and caches it in the dependencies_).
Note
getDependencies is an abstract method that custom RDDs are required to provide.
Reliable Checkpointing¶
checkpoint(): Unit
Public API
checkpoint is part of the public API.
Procedure
checkpoint is a procedure (returns Unit) so what happens inside stays inside (paraphrasing the former advertising slogan of Las Vegas, Nevada).
checkpoint creates a new ReliableRDDCheckpointData (with this RDD) and saves it in checkpointData registry.
checkpoint does nothing when the checkpointData registry has already been defined.
checkpoint throws a SparkException when the checkpoint directory is not specified:
Checkpoint directory has not been set in the SparkContext
RDDCheckpointData¶
checkpointData: Option[RDDCheckpointData[T]]
RDD defines checkpointData internal registry for a RDDCheckpointData[T] (of T type of this RDD).
The checkpointData registry is undefined (None) initially when this RDD is created and can hold a value after the following RDD API operators:
| RDD Operator | RDDCheckpointData |
|---|---|
| RDD.checkpoint | ReliableRDDCheckpointData |
| RDD.localCheckpoint | LocalRDDCheckpointData |
checkpointData is used when:
- isCheckpointedAndMaterialized
- isLocallyCheckpointed
- isReliablyCheckpointed
- getCheckpointFile
- doCheckpoint
CheckpointRDD¶
checkpointRDD: Option[CheckpointRDD[T]]
checkpointRDD returns the CheckpointRDD of the RDDCheckpointData (if defined and so this RDD checkpointed).
checkpointRDD is used when:
RDDis requested for the dependencies, partitions and preferred locations (all using final methods!)
doCheckpoint¶
doCheckpoint(): Unit
RDD.doCheckpoint, SparkContext.runJob and Dataset.checkpoint
doCheckpoint is called every time a Spark job is submitted (using SparkContext.runJob).
I found it quite interesting at the very least.
doCheckpoint is triggered when Dataset.checkpoint operator (Spark SQL) is executed (with eager flag on) which will likely trigger one or more Spark jobs on the underlying RDD anyway.
Procedure
doCheckpoint is a procedure (returns Unit) so what happens inside stays inside (paraphrasing the former advertising slogan of Las Vegas, Nevada).
Does nothing unless checkpointData is defined
My understanding is that doCheckpoint does nothing (noop) unless the RDDCheckpointData is defined.
doCheckpoint executes all the following in checkpoint scope.
doCheckpoint turns the doCheckpointCalled flag on (to prevent multiple executions).
doCheckpoint branches off based on whether a RDDCheckpointData is defined or not:
-
With the
RDDCheckpointDatadefined,doCheckpointchecks out the checkpointAllMarkedAncestors flag and if enabled,doCheckpointrequests the Dependencies for the RDD that are in turn requested to doCheckpoint themselves. Otherwise,doCheckpointrequests the RDDCheckpointData to checkpoint. -
With the RDDCheckpointData undefined,
doCheckpointrequests the Dependencies (of this RDD) for their RDDs that are in turn requested to doCheckpoint themselves (recursively).
Note
With the RDDCheckpointData defined, requesting doCheckpoint of the Dependencies is guarded by checkpointAllMarkedAncestors flag.
doCheckpoint skips execution if called earlier.
CheckpointRDD
CheckpointRDD is not checkpoint again (and does nothing when requested to do so).
doCheckpoint is used when:
SparkContextis requested to run a job synchronously
iterator¶
iterator(
split: Partition,
context: TaskContext): Iterator[T]
iterator...FIXME
Final Method
iterator is a final method and may not be overridden in subclasses. See 5.2.6 final in the Scala Language Specification.
getOrCompute¶
getOrCompute(
partition: Partition,
context: TaskContext): Iterator[T]
getOrCompute...FIXME
computeOrReadCheckpoint¶
computeOrReadCheckpoint(
split: Partition,
context: TaskContext): Iterator[T]
computeOrReadCheckpoint...FIXME
Debugging Recursive Dependencies¶
toDebugString: String
toDebugString returns a RDD Lineage Graph.
val wordCount = sc.textFile("README.md")
.flatMap(_.split("\\s+"))
.map((_, 1))
.reduceByKey(_ + _)
scala> println(wordCount.toDebugString)
(2) ShuffledRDD[21] at reduceByKey at <console>:24 []
+-(2) MapPartitionsRDD[20] at map at <console>:24 []
| MapPartitionsRDD[19] at flatMap at <console>:24 []
| README.md MapPartitionsRDD[18] at textFile at <console>:24 []
| README.md HadoopRDD[17] at textFile at <console>:24 []
toDebugString uses indentations to indicate a shuffle boundary.
The numbers in round brackets show the level of parallelism at each stage, e.g. (2) in the above output.
scala> println(wordCount.getNumPartitions)
2
With spark.logLineage enabled, toDebugString is printed out when executing an action.
$ ./bin/spark-shell --conf spark.logLineage=true
scala> sc.textFile("README.md", 4).count
...
15/10/17 14:46:42 INFO SparkContext: Starting job: count at <console>:25
15/10/17 14:46:42 INFO SparkContext: RDD's recursive dependencies:
(4) MapPartitionsRDD[1] at textFile at <console>:25 []
| README.md HadoopRDD[0] at textFile at <console>:25 []
coalesce¶
coalesce(
numPartitions: Int,
shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null): RDD[T]
coalesce...FIXME
coalesce is used when:
- RDD.repartition high-level operator is used
Implicit Methods¶
rddToOrderedRDDFunctions¶
rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag](
rdd: RDD[(K, V)]): OrderedRDDFunctions[K, V, (K, V)]
rddToOrderedRDDFunctions is an Scala implicit method that creates an OrderedRDDFunctions.
rddToOrderedRDDFunctions is used (implicitly) when:
withScope¶
withScope[U](
body: => U): U
withScope withScope with this SparkContext.
Note
withScope is used for most (if not all) RDD API operators.
mapPartitionsWithEvaluator¶
mapPartitionsWithEvaluator[U: ClassTag](
evaluatorFactory: PartitionEvaluatorFactory[T, U]): RDD[U]
mapPartitionsWithEvaluator creates a MapPartitionsWithEvaluatorRDD for this RDD and the given PartitionEvaluatorFactory.
zipPartitionsWithEvaluator¶
zipPartitionsWithEvaluator[U: ClassTag](
rdd2: RDD[T],
evaluatorFactory: PartitionEvaluatorFactory[T, U]): RDD[U]
zipPartitionsWithEvaluator creates a ZippedPartitionsWithEvaluatorRDD for this RDD and the given RDD and the PartitionEvaluatorFactory.