ShuffledHashJoinExec Physical Operator¶
ShuffledHashJoinExec
is a ShuffledJoin and HashJoin for shuffle-hash join.
ShuffledHashJoinExec
supports Java code generation for all the join types except FullOuter (variable prefix: shj
).
Performance Metrics¶
Key | Name (in web UI) | Description |
---|---|---|
numOutputRows | number of output rows | Number of output rows |
buildDataSize | data size of build side | |
buildTime | time to build hash map |
Creating Instance¶
ShuffledHashJoinExec
takes the following to be created:
- Left Key Expressions
- Right Key Expressions
- Join Type
-
BuildSide
- Optional Join Condition Expression
- Left Child Physical Operator
- Right Child Physical Operator
- isSkewJoin flag
ShuffledHashJoinExec
is created when:
- JoinSelection execution planning strategy is executed (createShuffleHashJoin)
Executing Physical Operator¶
doExecute(): RDD[InternalRow]
doExecute
is part of the SparkPlan abstraction.
Danger
Review Me
doExecute
requests streamedPlan physical operator to execute (and generate a RDD[InternalRow]
).
doExecute
requests buildPlan physical operator to execute (and generate a RDD[InternalRow]
).
doExecute
requests streamedPlan physical operator's RDD[InternalRow]
to zip partition-wise with buildPlan physical operator's RDD[InternalRow]
(using RDD.zipPartitions
method with preservesPartitioning
flag disabled).
doExecute
uses RDD.zipPartitions
with a function applied to zipped partitions that takes two iterators of rows from the partitions of streamedPlan
and buildPlan
.
For every partition (and pairs of rows from the RDD), the function buildHashedRelation on the partition of buildPlan
and join the streamedPlan
partition iterator, the HashedRelation, numOutputRows and avgHashProbe metrics.
Building HashedRelation¶
buildHashedRelation(
iter: Iterator[InternalRow]): HashedRelation
Danger
Review Me
buildHashedRelation
creates a HashedRelation (for the input iter
iterator of InternalRows
, buildKeys and the current TaskMemoryManager
).
buildHashedRelation
records the time to create the HashedRelation
as buildTime.
buildHashedRelation
requests the HashedRelation
for estimatedSize that is recorded as buildDataSize.
buildHashedRelation
is used when:
ShuffledHashJoinExec
is requested to execute (when streamedPlan and buildPlan physical operators are executed and their RDDs zipped partition-wise usingRDD.zipPartitions
method
supportCodegen¶
supportCodegen: Boolean
supportCodegen
is part of the CodegenSupport abstraction.
supportCodegen
is true
for all the join types except FullOuter.
needCopyResult¶
needCopyResult: Boolean
needCopyResult
is part of the CodegenSupport abstraction.
needCopyResult
is true
.
Demo¶
Enable DEBUG
logging level for ExtractEquiJoinKeys logger to see the join condition and the left and right join keys.
// Use ShuffledHashJoinExec's selection requirements
// 1. Disable auto broadcasting
// JoinSelection (canBuildLocalHashMap specifically) requires that
// plan.stats.sizeInBytes < autoBroadcastJoinThreshold * numShufflePartitions
// That gives that autoBroadcastJoinThreshold has to be at least 1
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 1)
scala> println(spark.sessionState.conf.numShufflePartitions)
200
// 2. Disable preference on SortMergeJoin
spark.conf.set("spark.sql.join.preferSortMergeJoin", false)
val dataset = Seq(
(0, "playing"),
(1, "with"),
(2, "ShuffledHashJoinExec")
).toDF("id", "token")
// Self LEFT SEMI join
val q = dataset.join(dataset, Seq("id"), "leftsemi")
val sizeInBytes = q.queryExecution.optimizedPlan.stats.sizeInBytes
scala> println(sizeInBytes)
72
// 3. canBuildLeft is on for leftsemi
// the right join side is at least three times smaller than the left side
// Even though it's a self LEFT SEMI join there are two different join sides
// How is that possible?
// BINGO! ShuffledHashJoin is here!
// Enable DEBUG logging level
import org.apache.log4j.{Level, Logger}
val logger = "org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys"
Logger.getLogger(logger).setLevel(Level.DEBUG)
// ShuffledHashJoin with BuildRight
scala> q.explain
== Physical Plan ==
ShuffledHashJoin [id#37], [id#41], LeftSemi, BuildRight
:- Exchange hashpartitioning(id#37, 200)
: +- LocalTableScan [id#37, token#38]
+- Exchange hashpartitioning(id#41, 200)
+- LocalTableScan [id#41]
scala> println(q.queryExecution.executedPlan.numberedTreeString)
00 ShuffledHashJoin [id#37], [id#41], LeftSemi, BuildRight
01 :- Exchange hashpartitioning(id#37, 200)
02 : +- LocalTableScan [id#37, token#38]
03 +- Exchange hashpartitioning(id#41, 200)
04 +- LocalTableScan [id#41]
doExecute
generates a ZippedPartitionsRDD2
that you can see in a RDD lineage.
scala> println(q.queryExecution.toRdd.toDebugString)
(200) ZippedPartitionsRDD2[8] at toRdd at <console>:26 []
| ShuffledRowRDD[3] at toRdd at <console>:26 []
+-(3) MapPartitionsRDD[2] at toRdd at <console>:26 []
| MapPartitionsRDD[1] at toRdd at <console>:26 []
| ParallelCollectionRDD[0] at toRdd at <console>:26 []
| ShuffledRowRDD[7] at toRdd at <console>:26 []
+-(3) MapPartitionsRDD[6] at toRdd at <console>:26 []
| MapPartitionsRDD[5] at toRdd at <console>:26 []
| ParallelCollectionRDD[4] at toRdd at <console>:26 []
HashJoin¶
ShuffledHashJoinExec
is a HashJoin.
BuildSide¶
buildSide: BuildSide
ShuffledHashJoinExec
is given a BuildSide
when created.
buildSide
is part of the HashJoin abstraction.
prepareRelation¶
prepareRelation(
ctx: CodegenContext): HashedRelationInfo
prepareRelation
requests the given CodegenContext for a code to reference this ShuffledHashJoinExec
(with plan
name).
prepareRelation
requests the given CodegenContext for a code with relationTerm mutable state (with the HashedRelation
class name, relation
variable name, etc.)
In the end, prepareRelation
creates a HashedRelationInfo
.
prepareRelation
is part of the HashJoin abstraction.
ShuffledJoin¶
ShuffledHashJoinExec
is a ShuffledJoin and performs a hash join of two child relations by first shuffling the data using the join keys.
isSkewJoin Flag¶
ShuffledHashJoinExec
can be given isSkewJoin
flag when created. It is assumed disabled (false
) by default.
isSkewJoin
can only be enabled (true
) when OptimizeSkewedJoin
adaptive physical optimization is requested to optimize a skew join.
isSkewJoin
is part of the ShuffledJoin abstraction.