BypassMergeSortShuffleHandle¶
BypassMergeSortShuffleHandle
is a BaseShuffleHandle that SortShuffleManager
uses when can avoid merge-sorting data (when requested to register a shuffle).
SerializedShuffleHandle
tells SortShuffleManager
to use BypassMergeSortShuffleWriter when requested for a ShuffleWriter.
Creating Instance¶
BypassMergeSortShuffleHandle
takes the following to be created:
- Shuffle ID
- ShuffleDependency
BypassMergeSortShuffleHandle
is created when:
SortShuffleManager
is requested for a ShuffleHandle (for the ShuffleDependency)
Demo¶
val rdd = sc.parallelize(0 to 8).groupBy(_ % 3)
assert(rdd.dependencies.length == 1)
import org.apache.spark.ShuffleDependency
val shuffleDep = rdd.dependencies.head.asInstanceOf[ShuffleDependency[Int, Int, Int]]
assert(shuffleDep.mapSideCombine == false, "mapSideCombine should be disabled")
assert(shuffleDep.aggregator.isDefined)
// Use ':paste -raw' mode to paste the code
package org.apache.spark
object open {
import org.apache.spark.SparkContext
def bypassMergeThreshold(sc: SparkContext) = {
import org.apache.spark.internal.config.SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD
sc.getConf.get(SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD)
}
}
import org.apache.spark.open
val bypassMergeThreshold = open.bypassMergeThreshold(sc)
assert(shuffleDep.partitioner.numPartitions < bypassMergeThreshold)
import org.apache.spark.shuffle.sort.BypassMergeSortShuffleHandle
// BypassMergeSortShuffleHandle is private[spark]
// so the following won't work :(
// assert(shuffleDep.shuffleHandle.isInstanceOf[BypassMergeSortShuffleHandle[Int, Int]])
assert(shuffleDep.shuffleHandle.toString.contains("BypassMergeSortShuffleHandle"))