BaseShuffleHandle¶
BaseShuffleHandle
is a ShuffleHandle that is used to capture the parameters when SortShuffleManager
is requested for a ShuffleHandle (and the other specialized ShuffleHandles could not be selected):
- Shuffle ID
- ShuffleDependency
Extensions¶
Demo¶
Start a Spark application (e.g., spark-shell) with the following Spark properties to trigger selection of BaseShuffleHandle
:
spark.shuffle.spill.numElementsForceSpillThreshold=1
spark.shuffle.sort.bypassMergeThreshold=1
./bin/spark-shell \
--conf spark.shuffle.spill.numElementsForceSpillThreshold=1 \
--conf spark.shuffle.sort.bypassMergeThreshold=1
Create an RDD with the number of partitions (numSlices
) greater than the value of spark.shuffle.sort.bypassMergeThreshold configuration property.
val rdd = sc.parallelize(0 to 4, numSlices = 2).groupBy(_ % 2)
assert(rdd.getNumPartitions == 2)
scala> rdd.dependencies
DEBUG SortShuffleManager: Can't use serialized shuffle for shuffle 0 because an aggregator is defined
res0: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.ShuffleDependency@1160c54b)
import org.apache.spark.ShuffleDependency
val shuffleDep = rdd.dependencies(0).asInstanceOf[ShuffleDependency[Int, Int, Int]]
// mapSideCombine is disabled
assert(shuffleDep.mapSideCombine == false)
// aggregator defined
assert(shuffleDep.aggregator.isDefined)
scala> shuffleDep.aggregator.get.getClass
val res11: Class[_ <: org.apache.spark.Aggregator[Int,Int,Int]] = class org.apache.spark.Aggregator
Note the number of reduce partitions that is smaller than spark.shuffle.sort.bypassMergeThreshold configuration property.
assert(shuffleDep.partitioner.numPartitions == 2)
scala> print(shuffleDep.shuffleHandle)
org.apache.spark.shuffle.sort.SerializedShuffleHandle@2b648069