Skip to content

BaseShuffleHandle

BaseShuffleHandle is a ShuffleHandle that is used to capture the parameters when SortShuffleManager is requested for a ShuffleHandle (and the other specialized ShuffleHandles could not be selected):

Extensions

Demo

Start a Spark application (e.g., spark-shell) with the following Spark properties to trigger selection of BaseShuffleHandle:

  • spark.shuffle.spill.numElementsForceSpillThreshold=1
  • spark.shuffle.sort.bypassMergeThreshold=1
./bin/spark-shell \
    --conf spark.shuffle.spill.numElementsForceSpillThreshold=1 \
    --conf spark.shuffle.sort.bypassMergeThreshold=1

Create an RDD with the number of partitions (numSlices) greater than the value of spark.shuffle.sort.bypassMergeThreshold configuration property.

val rdd = sc.parallelize(0 to 4, numSlices = 2).groupBy(_ % 2)
assert(rdd.getNumPartitions == 2)
scala> rdd.dependencies
DEBUG SortShuffleManager: Can't use serialized shuffle for shuffle 0 because an aggregator is defined
res0: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.ShuffleDependency@1160c54b)
import org.apache.spark.ShuffleDependency
val shuffleDep = rdd.dependencies(0).asInstanceOf[ShuffleDependency[Int, Int, Int]]
// mapSideCombine is disabled
assert(shuffleDep.mapSideCombine == false)
// aggregator defined
assert(shuffleDep.aggregator.isDefined)
scala> shuffleDep.aggregator.get.getClass
val res11: Class[_ <: org.apache.spark.Aggregator[Int,Int,Int]] = class org.apache.spark.Aggregator

Note the number of reduce partitions that is smaller than spark.shuffle.sort.bypassMergeThreshold configuration property.

assert(shuffleDep.partitioner.numPartitions == 2)
scala> print(shuffleDep.shuffleHandle)
org.apache.spark.shuffle.sort.SerializedShuffleHandle@2b648069