PairRDDFunctions

PairRDDFunctions is an extension of RDDs of (key, value) pairs (RDD[(K, V)]) with extra transformations.

PairRDDFunctions is available in RDDs of key-value pairs via Scala implicit conversion.

Table 1. PairRDDFunctions' Transformations
Method Description

aggregateByKey

aggregateByKey[U: ClassTag](
  zeroValue: U)(
    seqOp: (U, V) => U,
    combOp: (U, U) => U): RDD[(K, U)]
aggregateByKey[U: ClassTag](
  zeroValue: U, numPartitions: Int)(
    seqOp: (U, V) => U,
    combOp: (U, U) => U): RDD[(K, U)]
aggregateByKey[U: ClassTag](
  zeroValue: U, partitioner: Partitioner)(
    seqOp: (U, V) => U,
    combOp: (U, U) => U): RDD[(K, U)]

combineByKey

combineByKey[C](
  createCombiner: V => C,
  mergeValue: (C, V) => C,
  mergeCombiners: (C, C) => C): RDD[(K, C)]
combineByKey[C](
  createCombiner: V => C,
  mergeValue: (C, V) => C,
  mergeCombiners: (C, C) => C,
  numPartitions: Int): RDD[(K, C)]
combineByKey[C](
  createCombiner: V => C,
  mergeValue: (C, V) => C,
  mergeCombiners: (C, C) => C,
  partitioner: Partitioner,
  mapSideCombine: Boolean = true,
  serializer: Serializer = null): RDD[(K, C)]

countApproxDistinctByKey

countApproxDistinctByKey(
  relativeSD: Double = 0.05): RDD[(K, Long)]
countApproxDistinctByKey(
  relativeSD: Double,
  numPartitions: Int): RDD[(K, Long)]
countApproxDistinctByKey(
  relativeSD: Double,
  partitioner: Partitioner): RDD[(K, Long)]
countApproxDistinctByKey(
  p: Int,
  sp: Int,
  partitioner: Partitioner): RDD[(K, Long)]

flatMapValues

flatMapValues[U](
  f: V => TraversableOnce[U]): RDD[(K, U)]

foldByKey

foldByKey(
  zeroValue: V)(
    func: (V, V) => V): RDD[(K, V)]
foldByKey(
  zeroValue: V, numPartitions: Int)(
    func: (V, V) => V): RDD[(K, V)]
foldByKey(
  zeroValue: V,
  partitioner: Partitioner)(
    func: (V, V) => V): RDD[(K, V)]

mapValues

mapValues[U](
  f: V => U): RDD[(K, U)]

partitionBy

partitionBy(
  partitioner: Partitioner): RDD[(K, V)]

saveAsHadoopDataset

saveAsHadoopDataset(
  conf: JobConf): Unit

saveAsHadoopDataset uses the SparkHadoopWriter utility to write the key-value RDD out with a HadoopMapRedWriteConfigUtil (for the given Hadoop JobConf)

saveAsHadoopFile

saveAsHadoopFile(
  path: String,
  keyClass: Class[_],
  valueClass: Class[_],
  outputFormatClass: Class[_ <: OutputFormat[_, _]],
  codec: Class[_ <: CompressionCodec]): Unit
saveAsHadoopFile(
  path: String,
  keyClass: Class[_],
  valueClass: Class[_],
  outputFormatClass: Class[_ <: OutputFormat[_, _]],
  conf: JobConf = new JobConf(self.context.hadoopConfiguration),
  codec: Option[Class[_ <: CompressionCodec]] = None): Unit
saveAsHadoopFile[F <: OutputFormat[K, V]](
  path: String)(implicit fm: ClassTag[F]): Unit
saveAsHadoopFile[F <: OutputFormat[K, V]](
  path: String,
  codec: Class[_ <: CompressionCodec])(implicit fm: ClassTag[F]): Unit

saveAsNewAPIHadoopDataset

saveAsNewAPIHadoopDataset(
  conf: Configuration): Unit

Saves this RDD of key-value pairs (RDD[K,V]) to any Hadoop-supported storage system with new Hadoop API (using a Hadoop Configuration object for that storage system).

The configuration should set relevant output params (an output format, output paths, e.g. a table name to write to) in the same way as it would be configured for a Hadoop MapReduce job.

saveAsNewAPIHadoopDataset uses the SparkHadoopWriter utility to write the key-value RDD out with a HadoopMapReduceWriteConfigUtil (for the given Hadoop Configuration)

saveAsNewAPIHadoopFile

saveAsNewAPIHadoopFile(
  path: String,
  keyClass: Class[_],
  valueClass: Class[_],
  outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
  conf: Configuration = self.context.hadoopConfiguration): Unit
saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](
  path: String)(implicit fm: ClassTag[F]): Unit

groupByKey and reduceByKey Transformations

reduceByKey is sort of a particular case of aggregateByKey.

You may want to look at the number of partitions from another angle.

It may often not be important to have a given number of partitions upfront (at RDD creation time upon loading data from data sources), so only "regrouping" the data by key after it is an RDD might be…​the key (pun not intended).

You can use groupByKey or another PairRDDFunctions method to have a key in one processing flow.

You could use partitionBy that is available for RDDs to be RDDs of tuples, i.e. PairRDD:

rdd.keyBy(_.kind)
  .partitionBy(new HashPartitioner(PARTITIONS))
  .foreachPartition(...)

Think of situations where kind has low cardinality or highly skewed distribution and using the technique for partitioning might be not an optimal solution.

You could do as follows:

rdd.keyBy(_.kind).reduceByKey(....)

or mapValues or plenty of other solutions. FIXME, man.

combineByKeyWithClassTag Transformations

combineByKeyWithClassTag[C](
  createCombiner: V => C,
  mergeValue: (C, V) => C,
  mergeCombiners: (C, C) => C)(implicit ct: ClassTag[C]): RDD[(K, C)] (1)
combineByKeyWithClassTag[C](
  createCombiner: V => C,
  mergeValue: (C, V) => C,
  mergeCombiners: (C, C) => C,
  numPartitions: Int)(implicit ct: ClassTag[C]): RDD[(K, C)] (2)
combineByKeyWithClassTag[C](
  createCombiner: V => C,
  mergeValue: (C, V) => C,
  mergeCombiners: (C, C) => C,
  partitioner: Partitioner,
  mapSideCombine: Boolean = true,
  serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)]
1 FIXME
2 FIXME too

combineByKeyWithClassTag transformations use mapSideCombine enabled (i.e. true) by default. They create a ShuffledRDD with the value of mapSideCombine when the input partitioner is different from the current one in an RDD.

combineByKeyWithClassTag is a base transformation for combineByKey-based transformations, aggregateByKey, foldByKey, reduceByKey, countApproxDistinctByKey, and groupByKey.