PairRDDFunctions¶
PairRDDFunctions
is an extension of RDD API for additional high-level operators to work with key-value RDDs (RDD[(K, V)]
).
PairRDDFunctions
is available in RDDs of key-value pairs via Scala implicit conversion.
The gist of PairRDDFunctions
is combineByKeyWithClassTag.
aggregateByKey¶
aggregateByKey[U: ClassTag](
zeroValue: U)(
seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)] // (1)!
aggregateByKey[U: ClassTag](
zeroValue: U,
numPartitions: Int)(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)] // (2)!
aggregateByKey[U: ClassTag](
zeroValue: U,
partitioner: Partitioner)(
seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)]
- Uses the default Partitioner
- Creates a HashPartitioner with the given
numPartitions
partitions
aggregateByKey
...FIXME
combineByKey¶
combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C): RDD[(K, C)]
combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
numPartitions: Int): RDD[(K, C)]
combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null): RDD[(K, C)]
- Uses the default Partitioner
- Creates a HashPartitioner with the given
numPartitions
partitions
combineByKey
...FIXME
combineByKeyWithClassTag¶
combineByKeyWithClassTag[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C)(implicit ct: ClassTag[C]): RDD[(K, C)] // (1)!
combineByKeyWithClassTag[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
numPartitions: Int)(implicit ct: ClassTag[C]): RDD[(K, C)] // (2)!
combineByKeyWithClassTag[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)]
- Uses the default Partitioner
- Uses a HashPartitioner (with the given
numPartitions
)
combineByKeyWithClassTag
creates an Aggregator for the given aggregation functions.
combineByKeyWithClassTag
branches off per the given Partitioner.
If the input partitioner and the RDD's are the same, combineByKeyWithClassTag
simply mapPartitions on the RDD with the following arguments:
-
Iterator of the Aggregator
-
preservesPartitioning
flag turned on
If the input partitioner is different than the RDD's, combineByKeyWithClassTag
creates a ShuffledRDD (with the Serializer
, the Aggregator
, and the mapSideCombine
flag).
Usage¶
combineByKeyWithClassTag
lays the foundation for the following high-level RDD key-value pair transformations:
Requirements¶
combineByKeyWithClassTag
requires that the mergeCombiners
is defined (not-null
) or throws an IllegalArgumentException
:
mergeCombiners must be defined
combineByKeyWithClassTag
throws a SparkException
for the keys being of type array with the mapSideCombine
flag enabled:
Cannot use map-side combining with array keys.
combineByKeyWithClassTag
throws a SparkException
for the keys being of type array
with the partitioner being a HashPartitioner:
HashPartitioner cannot partition array keys.
Example¶
val nums = sc.parallelize(0 to 9, numSlices = 4)
val groups = nums.keyBy(_ % 2)
def createCombiner(n: Int) = {
println(s"createCombiner($n)")
n
}
def mergeValue(n1: Int, n2: Int) = {
println(s"mergeValue($n1, $n2)")
n1 + n2
}
def mergeCombiners(c1: Int, c2: Int) = {
println(s"mergeCombiners($c1, $c2)")
c1 + c2
}
val countByGroup = groups.combineByKeyWithClassTag(
createCombiner,
mergeValue,
mergeCombiners)
println(countByGroup.toDebugString)
/*
(4) ShuffledRDD[3] at combineByKeyWithClassTag at <console>:31 []
+-(4) MapPartitionsRDD[1] at keyBy at <console>:25 []
| ParallelCollectionRDD[0] at parallelize at <console>:24 []
*/
countApproxDistinctByKey¶
countApproxDistinctByKey(
relativeSD: Double = 0.05): RDD[(K, Long)] // (1)!
countApproxDistinctByKey(
relativeSD: Double,
numPartitions: Int): RDD[(K, Long)] // (2)!
countApproxDistinctByKey(
relativeSD: Double,
partitioner: Partitioner): RDD[(K, Long)]
countApproxDistinctByKey(
p: Int,
sp: Int,
partitioner: Partitioner): RDD[(K, Long)]
- Uses the default Partitioner
- Creates a HashPartitioner with the given
numPartitions
partitions
countApproxDistinctByKey
...FIXME
foldByKey¶
foldByKey(
zeroValue: V)(
func: (V, V) => V): RDD[(K, V)] // (1)!
foldByKey(
zeroValue: V,
numPartitions: Int)(
func: (V, V) => V): RDD[(K, V)] // (2)!
foldByKey(
zeroValue: V,
partitioner: Partitioner)(
func: (V, V) => V): RDD[(K, V)]
- Uses the default Partitioner
- Creates a HashPartitioner with the given
numPartitions
partitions
foldByKey
...FIXME
foldByKey
is used when:
- RDD.treeAggregate high-level operator is used
groupByKey¶
groupByKey(): RDD[(K, Iterable[V])] // (1)!
groupByKey(
numPartitions: Int): RDD[(K, Iterable[V])] // (2)!
groupByKey(
partitioner: Partitioner): RDD[(K, Iterable[V])]
- Uses the default Partitioner
- Creates a HashPartitioner with the given
numPartitions
partitions
groupByKey
...FIXME
groupByKey
is used when:
- RDD.groupBy high-level operator is used
partitionBy¶
partitionBy(
partitioner: Partitioner): RDD[(K, V)]
partitionBy
...FIXME
reduceByKey¶
reduceByKey(
func: (V, V) => V): RDD[(K, V)] // (1)!
reduceByKey(
func: (V, V) => V,
numPartitions: Int): RDD[(K, V)] // (2)!
reduceByKey(
partitioner: Partitioner,
func: (V, V) => V): RDD[(K, V)]
- Uses the default Partitioner
- Creates a HashPartitioner with the given
numPartitions
partitions
reduceByKey
is sort of a particular case of aggregateByKey.
reduceByKey
is used when:
- RDD.distinct high-level operator is used
saveAsNewAPIHadoopFile¶
saveAsNewAPIHadoopFile(
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
conf: Configuration = self.context.hadoopConfiguration): Unit
saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](
path: String)(implicit fm: ClassTag[F]): Unit
saveAsNewAPIHadoopFile
creates a new Job
(Hadoop MapReduce) for the given Configuration
(Hadoop).
saveAsNewAPIHadoopFile
configures the Job
(with the given keyClass
, valueClass
and outputFormatClass
).
saveAsNewAPIHadoopFile
sets mapreduce.output.fileoutputformat.outputdir
configuration property to be the given path
and saveAsNewAPIHadoopDataset.
saveAsNewAPIHadoopDataset¶
saveAsNewAPIHadoopDataset(
conf: Configuration): Unit
saveAsNewAPIHadoopDataset
creates a new HadoopMapReduceWriteConfigUtil (with the given Configuration
) and writes the RDD out.
Configuration
should have all the relevant output params set (an output format, output paths, e.g. a table name to write to) in the same way as it would be configured for a Hadoop MapReduce job.