PairRDDFunctions¶
PairRDDFunctions is an extension of RDD API for additional high-level operators to work with key-value RDDs (RDD[(K, V)]).
PairRDDFunctions is available in RDDs of key-value pairs via Scala implicit conversion.
The gist of PairRDDFunctions is combineByKeyWithClassTag.
aggregateByKey¶
aggregateByKey[U: ClassTag](
zeroValue: U)(
seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)] // (1)!
aggregateByKey[U: ClassTag](
zeroValue: U,
numPartitions: Int)(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)] // (2)!
aggregateByKey[U: ClassTag](
zeroValue: U,
partitioner: Partitioner)(
seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)]
- Uses the default Partitioner
- Creates a HashPartitioner with the given
numPartitionspartitions
aggregateByKey...FIXME
combineByKey¶
combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C): RDD[(K, C)]
combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
numPartitions: Int): RDD[(K, C)]
combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null): RDD[(K, C)]
- Uses the default Partitioner
- Creates a HashPartitioner with the given
numPartitionspartitions
combineByKey...FIXME
combineByKeyWithClassTag¶
combineByKeyWithClassTag[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C)(implicit ct: ClassTag[C]): RDD[(K, C)] // (1)!
combineByKeyWithClassTag[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
numPartitions: Int)(implicit ct: ClassTag[C]): RDD[(K, C)] // (2)!
combineByKeyWithClassTag[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)]
- Uses the default Partitioner
- Uses a HashPartitioner (with the given
numPartitions)
combineByKeyWithClassTag creates an Aggregator for the given aggregation functions.
combineByKeyWithClassTag branches off per the given Partitioner.
If the input partitioner and the RDD's are the same, combineByKeyWithClassTag simply mapPartitions on the RDD with the following arguments:
-
Iterator of the Aggregator
-
preservesPartitioningflag turned on
If the input partitioner is different than the RDD's, combineByKeyWithClassTag creates a ShuffledRDD (with the Serializer, the Aggregator, and the mapSideCombine flag).
Usage¶
combineByKeyWithClassTag lays the foundation for the following high-level RDD key-value pair transformations:
Requirements¶
combineByKeyWithClassTag requires that the mergeCombiners is defined (not-null) or throws an IllegalArgumentException:
mergeCombiners must be defined
combineByKeyWithClassTag throws a SparkException for the keys being of type array with the mapSideCombine flag enabled:
Cannot use map-side combining with array keys.
combineByKeyWithClassTag throws a SparkException for the keys being of type array with the partitioner being a HashPartitioner:
HashPartitioner cannot partition array keys.
Example¶
val nums = sc.parallelize(0 to 9, numSlices = 4)
val groups = nums.keyBy(_ % 2)
def createCombiner(n: Int) = {
println(s"createCombiner($n)")
n
}
def mergeValue(n1: Int, n2: Int) = {
println(s"mergeValue($n1, $n2)")
n1 + n2
}
def mergeCombiners(c1: Int, c2: Int) = {
println(s"mergeCombiners($c1, $c2)")
c1 + c2
}
val countByGroup = groups.combineByKeyWithClassTag(
createCombiner,
mergeValue,
mergeCombiners)
println(countByGroup.toDebugString)
/*
(4) ShuffledRDD[3] at combineByKeyWithClassTag at <console>:31 []
+-(4) MapPartitionsRDD[1] at keyBy at <console>:25 []
| ParallelCollectionRDD[0] at parallelize at <console>:24 []
*/
countApproxDistinctByKey¶
countApproxDistinctByKey(
relativeSD: Double = 0.05): RDD[(K, Long)] // (1)!
countApproxDistinctByKey(
relativeSD: Double,
numPartitions: Int): RDD[(K, Long)] // (2)!
countApproxDistinctByKey(
relativeSD: Double,
partitioner: Partitioner): RDD[(K, Long)]
countApproxDistinctByKey(
p: Int,
sp: Int,
partitioner: Partitioner): RDD[(K, Long)]
- Uses the default Partitioner
- Creates a HashPartitioner with the given
numPartitionspartitions
countApproxDistinctByKey...FIXME
foldByKey¶
foldByKey(
zeroValue: V)(
func: (V, V) => V): RDD[(K, V)] // (1)!
foldByKey(
zeroValue: V,
numPartitions: Int)(
func: (V, V) => V): RDD[(K, V)] // (2)!
foldByKey(
zeroValue: V,
partitioner: Partitioner)(
func: (V, V) => V): RDD[(K, V)]
- Uses the default Partitioner
- Creates a HashPartitioner with the given
numPartitionspartitions
foldByKey...FIXME
foldByKey is used when:
- RDD.treeAggregate high-level operator is used
groupByKey¶
groupByKey(): RDD[(K, Iterable[V])] // (1)!
groupByKey(
numPartitions: Int): RDD[(K, Iterable[V])] // (2)!
groupByKey(
partitioner: Partitioner): RDD[(K, Iterable[V])]
- Uses the default Partitioner
- Creates a HashPartitioner with the given
numPartitionspartitions
groupByKey...FIXME
groupByKey is used when:
- RDD.groupBy high-level operator is used
partitionBy¶
partitionBy(
partitioner: Partitioner): RDD[(K, V)]
partitionBy...FIXME
reduceByKey¶
reduceByKey(
func: (V, V) => V): RDD[(K, V)] // (1)!
reduceByKey(
func: (V, V) => V,
numPartitions: Int): RDD[(K, V)] // (2)!
reduceByKey(
partitioner: Partitioner,
func: (V, V) => V): RDD[(K, V)]
- Uses the default Partitioner
- Creates a HashPartitioner with the given
numPartitionspartitions
reduceByKey is sort of a particular case of aggregateByKey.
reduceByKey is used when:
- RDD.distinct high-level operator is used
saveAsNewAPIHadoopFile¶
saveAsNewAPIHadoopFile(
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
conf: Configuration = self.context.hadoopConfiguration): Unit
saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](
path: String)(implicit fm: ClassTag[F]): Unit
saveAsNewAPIHadoopFile creates a new Job (Hadoop MapReduce) for the given Configuration (Hadoop).
saveAsNewAPIHadoopFile configures the Job (with the given keyClass, valueClass and outputFormatClass).
saveAsNewAPIHadoopFile sets mapreduce.output.fileoutputformat.outputdir configuration property to be the given path and saveAsNewAPIHadoopDataset.
saveAsNewAPIHadoopDataset¶
saveAsNewAPIHadoopDataset(
conf: Configuration): Unit
saveAsNewAPIHadoopDataset creates a new HadoopMapReduceWriteConfigUtil (with the given Configuration) and writes the RDD out.
Configuration should have all the relevant output params set (an output format, output paths, e.g. a table name to write to) in the same way as it would be configured for a Hadoop MapReduce job.