Skip to content

ShuffledHashJoinExec Physical Operator

ShuffledHashJoinExec is a ShuffledJoin and HashJoin for shuffle-hash join.

ShuffledHashJoinExec supports Java code generation for all the join types except FullOuter (variable prefix: shj).

Performance Metrics

Key Name (in web UI) Description
numOutputRows number of output rows Number of output rows
buildDataSize data size of build side
buildTime time to build hash map

ShuffledHashJoinExec in web UI (Details for Query)

Creating Instance

ShuffledHashJoinExec takes the following to be created:

ShuffledHashJoinExec is created when:

Executing Physical Operator

doExecute(): RDD[InternalRow]

doExecute is part of the SparkPlan abstraction.


Review Me

doExecute requests streamedPlan physical operator to execute (and generate a RDD[InternalRow]).

doExecute requests buildPlan physical operator to execute (and generate a RDD[InternalRow]).

doExecute requests streamedPlan physical operator's RDD[InternalRow] to zip partition-wise with buildPlan physical operator's RDD[InternalRow] (using RDD.zipPartitions method with preservesPartitioning flag disabled).

doExecute uses RDD.zipPartitions with a function applied to zipped partitions that takes two iterators of rows from the partitions of streamedPlan and buildPlan.

For every partition (and pairs of rows from the RDD), the function buildHashedRelation on the partition of buildPlan and join the streamedPlan partition iterator, the HashedRelation, numOutputRows and avgHashProbe metrics.

Building HashedRelation

  iter: Iterator[InternalRow]): HashedRelation


Review Me

buildHashedRelation creates a HashedRelation (for the input iter iterator of InternalRows, buildKeys and the current TaskMemoryManager).

buildHashedRelation records the time to create the HashedRelation as buildTime.

buildHashedRelation requests the HashedRelation for estimatedSize that is recorded as buildDataSize.

buildHashedRelation is used when:

  • ShuffledHashJoinExec is requested to execute (when streamedPlan and buildPlan physical operators are executed and their RDDs zipped partition-wise using RDD.zipPartitions method


supportCodegen: Boolean

supportCodegen is part of the CodegenSupport abstraction.

supportCodegen is true for all the join types except FullOuter.


needCopyResult: Boolean

needCopyResult is part of the CodegenSupport abstraction.

needCopyResult is true.


Enable DEBUG logging level for ExtractEquiJoinKeys logger to see the join condition and the left and right join keys.

// Use ShuffledHashJoinExec's selection requirements
// 1. Disable auto broadcasting
// JoinSelection (canBuildLocalHashMap specifically) requires that
// plan.stats.sizeInBytes < autoBroadcastJoinThreshold * numShufflePartitions
// That gives that autoBroadcastJoinThreshold has to be at least 1
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 1)

scala> println(spark.sessionState.conf.numShufflePartitions)

// 2. Disable preference on SortMergeJoin
spark.conf.set("spark.sql.join.preferSortMergeJoin", false)

val dataset = Seq(
  (0, "playing"),
  (1, "with"),
  (2, "ShuffledHashJoinExec")
).toDF("id", "token")
// Self LEFT SEMI join
val q = dataset.join(dataset, Seq("id"), "leftsemi")

val sizeInBytes = q.queryExecution.optimizedPlan.stats.sizeInBytes
scala> println(sizeInBytes)

// 3. canBuildLeft is on for leftsemi

// the right join side is at least three times smaller than the left side
// Even though it's a self LEFT SEMI join there are two different join sides
// How is that possible?

// BINGO! ShuffledHashJoin is here!

// Enable DEBUG logging level
import org.apache.log4j.{Level, Logger}
val logger = "org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys"

// ShuffledHashJoin with BuildRight
scala> q.explain
== Physical Plan ==
ShuffledHashJoin [id#37], [id#41], LeftSemi, BuildRight
:- Exchange hashpartitioning(id#37, 200)
:  +- LocalTableScan [id#37, token#38]
+- Exchange hashpartitioning(id#41, 200)
   +- LocalTableScan [id#41]

scala> println(q.queryExecution.executedPlan.numberedTreeString)
00 ShuffledHashJoin [id#37], [id#41], LeftSemi, BuildRight
01 :- Exchange hashpartitioning(id#37, 200)
02 :  +- LocalTableScan [id#37, token#38]
03 +- Exchange hashpartitioning(id#41, 200)
04    +- LocalTableScan [id#41]

doExecute generates a ZippedPartitionsRDD2 that you can see in a RDD lineage.

scala> println(q.queryExecution.toRdd.toDebugString)
(200) ZippedPartitionsRDD2[8] at toRdd at <console>:26 []
  |   ShuffledRowRDD[3] at toRdd at <console>:26 []
  +-(3) MapPartitionsRDD[2] at toRdd at <console>:26 []
     |  MapPartitionsRDD[1] at toRdd at <console>:26 []
     |  ParallelCollectionRDD[0] at toRdd at <console>:26 []
  |   ShuffledRowRDD[7] at toRdd at <console>:26 []
  +-(3) MapPartitionsRDD[6] at toRdd at <console>:26 []
     |  MapPartitionsRDD[5] at toRdd at <console>:26 []
     |  ParallelCollectionRDD[4] at toRdd at <console>:26 []


ShuffledHashJoinExec is a HashJoin.


buildSide: BuildSide

ShuffledHashJoinExec is given a BuildSide when created.

buildSide is part of the HashJoin abstraction.


  ctx: CodegenContext): HashedRelationInfo

prepareRelation requests the given CodegenContext for a code to reference this ShuffledHashJoinExec (with plan name).

prepareRelation requests the given CodegenContext for a code with relationTerm mutable state (with the HashedRelation class name, relation variable name, etc.)

In the end, prepareRelation creates a HashedRelationInfo.

prepareRelation is part of the HashJoin abstraction.


ShuffledHashJoinExec is a ShuffledJoin and performs a hash join of two child relations by first shuffling the data using the join keys.

isSkewJoin Flag

ShuffledHashJoinExec can be given isSkewJoin flag when created. It is assumed disabled (false) by default.

isSkewJoin can only be enabled (true) when OptimizeSkewedJoin adaptive physical optimization is requested to optimize a skew join.

isSkewJoin is part of the ShuffledJoin abstraction.