BroadcastHashJoinExec Physical Operator¶
BroadcastHashJoinExec
is a hash-based join physical operator for broadcast hash join.
BroadcastHashJoinExec
supports Java code generation (variable prefix: bhj
).
Performance Metrics¶
Key | Name (in web UI) | Description |
---|---|---|
numOutputRows | number of output rows | Number of output rows |
Creating Instance¶
BroadcastHashJoinExec
takes the following to be created:
- Left Key Expressions
- Right Key Expressions
- Join Type
-
BuildSide
- Optional Join Condition Expression
- Left Child Physical Operator
- Right Child Physical Operator
- isNullAwareAntiJoin flag
BroadcastHashJoinExec
is created when:
- JoinSelection execution planning strategy is executed (createBroadcastHashJoin and ExtractSingleColumnNullAwareAntiJoin)
- LogicalQueryStageStrategy execution planning strategy is executed (ExtractEquiJoinKeys and ExtractSingleColumnNullAwareAntiJoin)
isNullAwareAntiJoin Flag¶
BroadcastHashJoinExec
can be given isNullAwareAntiJoin
flag when created.
isNullAwareAntiJoin
flag is false
by default.
isNullAwareAntiJoin
flag is true
when:
- JoinSelection execution planning strategy is executed (for an ExtractSingleColumnNullAwareAntiJoin)
- LogicalQueryStageStrategy execution planning strategy is executed (for an ExtractSingleColumnNullAwareAntiJoin)
If enabled, BroadcastHashJoinExec
makes sure that the following all hold:
- There is one left key only
- There is one right key only
- Join Type is LeftAnti
- Build Side is
BuildRight
- Join condition is not defined
isNullAwareAntiJoin
is used for the following:
- Required Child Output Distribution (and create a HashedRelationBroadcastMode)
- Executing Physical Operator
- Generating Java Code for Anti Join
Required Child Output Distribution¶
requiredChildDistribution: Seq[Distribution]
requiredChildDistribution
is part of the SparkPlan abstraction.
BuildSide | Left Child | Right Child |
---|---|---|
BuildLeft | BroadcastDistribution with HashedRelationBroadcastMode broadcast mode of build join keys | UnspecifiedDistribution |
BuildRight | UnspecifiedDistribution | BroadcastDistribution with HashedRelationBroadcastMode broadcast mode of build join keys |
Output Data Partitioning Requirements¶
outputPartitioning: Partitioning
outputPartitioning
is part of the SparkPlan abstraction.
outputPartitioning
...FIXME
Executing Physical Operator¶
doExecute(): RDD[InternalRow]
doExecute
is part of the SparkPlan abstraction.
doExecute
requests the buildPlan to executeBroadcast (that gives a broadcast variable with a HashedRelation).
doExecute
branches off based on isNullAwareAntiJoin flag: enabled or not.
isNullAwareAntiJoin Enabled¶
doExecute
...FIXME
isNullAwareAntiJoin Disabled¶
doExecute
requests the streamedPlan to execute (that gives an RDD[InternalRow]
) and maps over partitions (RDD.mapPartitions
):
- Takes the read-only copy of the HashedRelation (from the broadcast variable)
- Increment the peak execution memory (of the task) by the size of the
HashedRelation
- Joins the rows with the
HashedRelation
(with the numOutputRows metric)
Generating Java Code for Anti Join¶
codegenAnti(
ctx: CodegenContext,
input: Seq[ExprCode]): String
codegenAnti
is part of the HashJoin abstraction.
codegenAnti
...FIXME
Demo¶
val tokens = Seq(
(0, "playing"),
(1, "with"),
(2, "BroadcastHashJoinExec")
).toDF("id", "token")
val q = tokens.join(tokens, Seq("id"), "inner")
scala> println(q.queryExecution.executedPlan.numberedTreeString)
00 AdaptiveSparkPlan isFinalPlan=false
01 +- Project [id#18, token#19, token#25]
02 +- BroadcastHashJoin [id#18], [id#24], Inner, BuildRight, false
03 :- LocalTableScan [id#18, token#19]
04 +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#16]
05 +- LocalTableScan [id#24, token#25]
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
val op = q
.queryExecution
.executedPlan
.collect { case op: AdaptiveSparkPlanExec => op }
.head
scala> println(op.treeString)
AdaptiveSparkPlan isFinalPlan=false
+- Project [id#18, token#19, token#25]
+- BroadcastHashJoin [id#18], [id#24], Inner, BuildRight, false
:- LocalTableScan [id#18, token#19]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#16]
+- LocalTableScan [id#24, token#25]
Execute the adaptive operator to generate the final execution plan.
op.executeTake(1)
Mind the isFinalPlan flag that is now enabled.
scala> println(op.treeString)
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
*(1) Project [id#18, token#19, token#25]
+- *(1) BroadcastHashJoin [id#18], [id#24], Inner, BuildRight, false
:- *(1) LocalTableScan [id#18, token#19]
+- BroadcastQueryStage 0
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#16]
+- LocalTableScan [id#24, token#25]
+- == Initial Plan ==
Project [id#18, token#19, token#25]
+- BroadcastHashJoin [id#18], [id#24], Inner, BuildRight, false
:- LocalTableScan [id#18, token#19]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#16]
+- LocalTableScan [id#24, token#25]
With the isFinalPlan flag enabled, it is possible to print out the WholeStageCodegen subtrees.
scala> q.queryExecution.debug.codegen
Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 (maxMethodCodeSize:265; maxConstantPoolSize:146(0.22% used); numInnerClasses:0) ==
*(1) Project [id#18, token#19, token#25]
+- *(1) BroadcastHashJoin [id#18], [id#24], Inner, BuildRight, false
:- *(1) LocalTableScan [id#18, token#19]
+- BroadcastQueryStage 0
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#16]
+- LocalTableScan [id#24, token#25]
Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=1
/* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */ private Object[] references;
/* 008 */ private scala.collection.Iterator[] inputs;
/* 009 */ private scala.collection.Iterator localtablescan_input_0;
/* 010 */ private org.apache.spark.sql.execution.joins.LongHashedRelation bhj_relation_0;
/* 011 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] bhj_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[2];
/* 012 */
/* 013 */ public GeneratedIteratorForCodegenStage1(Object[] references) {
/* 014 */ this.references = references;
/* 015 */ }
/* 016 */
/* 017 */ public void init(int index, scala.collection.Iterator[] inputs) {
/* 018 */ partitionIndex = index;
/* 019 */ this.inputs = inputs;
/* 020 */ localtablescan_input_0 = inputs[0];
/* 021 */
/* 022 */ bhj_relation_0 = ((org.apache.spark.sql.execution.joins.LongHashedRelation) ((org.apache.spark.broadcast.TorrentBroadcast) references[1] /* broadcast */).value()).asReadOnlyCopy();
/* 023 */ incPeakExecutionMemory(bhj_relation_0.estimatedSize());
/* 024 */
/* 025 */ bhj_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(4, 64);
/* 026 */ bhj_mutableStateArray_0[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(3, 64);
/* 027 */
/* 028 */ }
...
Let's access the generated source code via WholeStageCodegenExec physical operator.
val aqe = op
import org.apache.spark.sql.execution.WholeStageCodegenExec
val wsce = aqe.executedPlan
.collect { case op: WholeStageCodegenExec => op }
.head
val (_, source) = wsce.doCodeGen
import org.apache.spark.sql.catalyst.expressions.codegen.CodeFormatter
val formattedCode = CodeFormatter.format(source)
scala> println(formattedCode)
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=1
/* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */ private Object[] references;
/* 008 */ private scala.collection.Iterator[] inputs;
/* 009 */ private scala.collection.Iterator localtablescan_input_0;
/* 010 */ private org.apache.spark.sql.execution.joins.LongHashedRelation bhj_relation_0;
/* 011 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] bhj_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[2];
/* 012 */
/* 013 */ public GeneratedIteratorForCodegenStage1(Object[] references) {
/* 014 */ this.references = references;
/* 015 */ }
...