Transformations -- Lazy Operations on RDD (to Create One or More RDDs)¶
Transformations are lazy operations on an rdd:RDD.md[RDD] that create one or many new RDDs.
// T and U are Scala types transformation: RDD[T] => RDD[U] transformation: RDD[T] => Seq[RDD[U]]
In other words, transformations are functions that take an RDD as the input and produce one or many RDDs as the output. Transformations do not change the input RDD (since rdd:index.md#introduction[RDDs are immutable] and hence cannot be modified), but produce one or more new RDDs by applying the computations they represent.
[[methods]] .(Subset of) RDD Transformations (Public API) [cols="1m,3",options="header",width="100%"] |=== | Method | Description
| aggregate a| [[aggregate]]
aggregateU( seqOp: (U, T) => U, combOp: (U, U) => U): U
| barrier a| [[barrier]]
(New in 2.4.0) Marks the current stage as a <
barrier creates a <
| cache a| [[cache]]
Persists the RDD with the storage:StorageLevel.md#MEMORY_ONLY[MEMORY_ONLY] storage level
Synonym of <
| coalesce a| [[coalesce]]
coalesce( numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty) (implicit ord: Ordering[T] = null): RDD[T]
| filter a| [[filter]]
filter(f: T => Boolean): RDD[T]¶
| flatMap a| [[flatMap]]
| map a| [[map]]
| mapPartitions a| [[mapPartitions]]
| mapPartitionsWithIndex a| [[mapPartitionsWithIndex]]
| randomSplit a| [[randomSplit]]
randomSplit( weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]]
| union a| [[union]]
++(other: RDD[T]): RDD[T] union(other: RDD[T]): RDD[T]
| persist a| [[persist]]
persist(): this.type persist(newLevel: StorageLevel): this.type
By applying transformations you incrementally build a RDD lineage with all the parent RDDs of the final RDD(s).
Transformations are lazy, i.e. are not executed immediately. Only after calling an action are transformations executed.
After executing a transformation, the result RDD(s) will always be different from their parents and can be smaller (e.g.
sample), bigger (e.g.
cartesian) or the same size (e.g.
CAUTION: There are transformations that may trigger jobs, e.g.
.From SparkContext by transformations to the result image::rdd-sparkcontext-transformations-action.png[align="center"]
Certain transformations can be pipelined which is an optimization that Spark uses to improve performance of computations.
scala> val file = sc.textFile("README.md") file: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD at textFile at
scala> val allWords = file.flatMap(_.split("\W+")) allWords: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD at flatMap at
scala> val words = allWords.filter(!_.isEmpty) words: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD at filter at
scala> val pairs = words.map((_,1)) pairs: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD at map at
scala> val reducedByKey = pairs.reduceByKey(_ + _) reducedByKey: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD at reduceByKey at
scala> val top10words = reducedByKey.takeOrdered(10)(Ordering[Int].reverse.on(_._2)) INFO SparkContext: Starting job: takeOrdered at
There are two kinds of transformations:
=== [[narrow-transformations]] Narrow Transformations
Narrow transformations are the result of
filter and such that is from the data from a single partition only, i.e. it is self-sustained.
An output RDD has partitions with records that originate from a single partition in the parent RDD. Only a limited subset of partitions used to calculate the result.
Spark groups narrow transformations as a stage which is called pipelining.
=== [[wide-transformations]] Wide Transformations
Wide transformations are the result of
reduceByKey. The data required to compute the records in a single partition may reside in many partitions of the parent RDD.
NOTE: Wide transformations are also called shuffle transformations as they may or may not depend on a shuffle.
All of the tuples with the same key must end up in the same partition, processed by the same task. To satisfy these operations, Spark must execute a RDD shuffle, which transfers data across cluster and results in a new stage with a new set of partitions.
zipWithIndex(): RDD[(T, Long)]¶
zipWithIndex zips this
RDD[T] with its element indices.
If the number of partitions of the source RDD is greater than 1, it will submit an additional job to calculate start indices.
val onePartition = sc.parallelize(0 to 9, 1)
scala> onePartition.partitions.length res0: Int = 1
// no job submitted onePartition.zipWithIndex
val eightPartitions = sc.parallelize(0 to 9, 8)
scala> eightPartitions.partitions.length res1: Int = 8
// submits a job eightPartitions.zipWithIndex
.Spark job submitted by zipWithIndex transformation image::spark-transformations-zipWithIndex-webui.png[align="center"] ====