Skip to content

BypassMergeSortShuffleHandle

BypassMergeSortShuffleHandle is a BaseShuffleHandle that SortShuffleManager uses when can avoid merge-sorting data (when requested to register a shuffle).

SerializedShuffleHandle tells SortShuffleManager to use BypassMergeSortShuffleWriter when requested for a ShuffleWriter.

Creating Instance

BypassMergeSortShuffleHandle takes the following to be created:

BypassMergeSortShuffleHandle is created when:

Demo

val rdd = sc.parallelize(0 to 8).groupBy(_ % 3)

assert(rdd.dependencies.length == 1)

import org.apache.spark.ShuffleDependency
val shuffleDep = rdd.dependencies.head.asInstanceOf[ShuffleDependency[Int, Int, Int]]

assert(shuffleDep.mapSideCombine == false, "mapSideCombine should be disabled")
assert(shuffleDep.aggregator.isDefined)
// Use ':paste -raw' mode to paste the code
package org.apache.spark
object open {
  import org.apache.spark.SparkContext
  def bypassMergeThreshold(sc: SparkContext) = {
    import org.apache.spark.internal.config.SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD
    sc.getConf.get(SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD)
  }
}
import org.apache.spark.open
val bypassMergeThreshold = open.bypassMergeThreshold(sc)

assert(shuffleDep.partitioner.numPartitions < bypassMergeThreshold)
import org.apache.spark.shuffle.sort.BypassMergeSortShuffleHandle
// BypassMergeSortShuffleHandle is private[spark]
// so the following won't work :(
// assert(shuffleDep.shuffleHandle.isInstanceOf[BypassMergeSortShuffleHandle[Int, Int]])
assert(shuffleDep.shuffleHandle.toString.contains("BypassMergeSortShuffleHandle"))