Skip to content

BroadcastHashJoinExec Physical Operator

BroadcastHashJoinExec is a hash-based join physical operator for broadcast hash join.

BroadcastHashJoinExec supports Java code generation (variable prefix: bhj).

Performance Metrics

Key Name (in web UI) Description
numOutputRows number of output rows Number of output rows

BroadcastHashJoinExec in web UI (Details for Query)

Creating Instance

BroadcastHashJoinExec takes the following to be created:

BroadcastHashJoinExec is created when:

isNullAwareAntiJoin Flag

BroadcastHashJoinExec can be given isNullAwareAntiJoin flag when created.

isNullAwareAntiJoin flag is false by default.

isNullAwareAntiJoin flag is true when:

If enabled, BroadcastHashJoinExec makes sure that the following all hold:

  1. There is one left key only
  2. There is one right key only
  3. Join Type is LeftAnti
  4. Build Side is BuildRight
  5. Join condition is not defined

isNullAwareAntiJoin is used for the following:

Required Child Output Distribution

requiredChildDistribution: Seq[Distribution]

requiredChildDistribution is part of the SparkPlan abstraction.

BuildSide Left Child Right Child
BuildLeft BroadcastDistribution with HashedRelationBroadcastMode broadcast mode of build join keys UnspecifiedDistribution
BuildRight UnspecifiedDistribution BroadcastDistribution with HashedRelationBroadcastMode broadcast mode of build join keys

Output Data Partitioning Requirements

outputPartitioning: Partitioning

outputPartitioning is part of the SparkPlan abstraction.

outputPartitioning...FIXME

Executing Physical Operator

doExecute(): RDD[InternalRow]

doExecute is part of the SparkPlan abstraction.

doExecute requests the buildPlan to executeBroadcast (that gives a broadcast variable with a HashedRelation).

doExecute branches off based on isNullAwareAntiJoin flag: enabled or not.

isNullAwareAntiJoin Enabled

doExecute...FIXME

isNullAwareAntiJoin Disabled

doExecute requests the streamedPlan to execute (that gives an RDD[InternalRow]) and maps over partitions (RDD.mapPartitions):

  1. Takes the read-only copy of the HashedRelation (from the broadcast variable)
  2. Increment the peak execution memory (of the task) by the size of the HashedRelation
  3. Joins the rows with the HashedRelation (with the numOutputRows metric)

Generating Java Code for Anti Join

codegenAnti(
  ctx: CodegenContext,
  input: Seq[ExprCode]): String

codegenAnti is part of the HashJoin abstraction.

codegenAnti...FIXME

Demo

val tokens = Seq(
  (0, "playing"),
  (1, "with"),
  (2, "BroadcastHashJoinExec")
).toDF("id", "token")
val q = tokens.join(tokens, Seq("id"), "inner")
scala> println(q.queryExecution.executedPlan.numberedTreeString)
00 AdaptiveSparkPlan isFinalPlan=false
01 +- Project [id#18, token#19, token#25]
02    +- BroadcastHashJoin [id#18], [id#24], Inner, BuildRight, false
03       :- LocalTableScan [id#18, token#19]
04       +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#16]
05          +- LocalTableScan [id#24, token#25]
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
val op = q
  .queryExecution
  .executedPlan
  .collect { case op: AdaptiveSparkPlanExec => op }
  .head
scala> println(op.treeString)
AdaptiveSparkPlan isFinalPlan=false
+- Project [id#18, token#19, token#25]
   +- BroadcastHashJoin [id#18], [id#24], Inner, BuildRight, false
      :- LocalTableScan [id#18, token#19]
      +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#16]
         +- LocalTableScan [id#24, token#25]

Execute the adaptive operator to generate the final execution plan.

op.executeTake(1)

Mind the isFinalPlan flag that is now enabled.

scala> println(op.treeString)
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(1) Project [id#18, token#19, token#25]
   +- *(1) BroadcastHashJoin [id#18], [id#24], Inner, BuildRight, false
      :- *(1) LocalTableScan [id#18, token#19]
      +- BroadcastQueryStage 0
         +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#16]
            +- LocalTableScan [id#24, token#25]
+- == Initial Plan ==
   Project [id#18, token#19, token#25]
   +- BroadcastHashJoin [id#18], [id#24], Inner, BuildRight, false
      :- LocalTableScan [id#18, token#19]
      +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#16]
         +- LocalTableScan [id#24, token#25]

With the isFinalPlan flag enabled, it is possible to print out the WholeStageCodegen subtrees.

scala> q.queryExecution.debug.codegen
Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 (maxMethodCodeSize:265; maxConstantPoolSize:146(0.22% used); numInnerClasses:0) ==
*(1) Project [id#18, token#19, token#25]
+- *(1) BroadcastHashJoin [id#18], [id#24], Inner, BuildRight, false
   :- *(1) LocalTableScan [id#18, token#19]
   +- BroadcastQueryStage 0
      +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#16]
         +- LocalTableScan [id#24, token#25]

Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=1
/* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */   private Object[] references;
/* 008 */   private scala.collection.Iterator[] inputs;
/* 009 */   private scala.collection.Iterator localtablescan_input_0;
/* 010 */   private org.apache.spark.sql.execution.joins.LongHashedRelation bhj_relation_0;
/* 011 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] bhj_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[2];
/* 012 */
/* 013 */   public GeneratedIteratorForCodegenStage1(Object[] references) {
/* 014 */     this.references = references;
/* 015 */   }
/* 016 */
/* 017 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 018 */     partitionIndex = index;
/* 019 */     this.inputs = inputs;
/* 020 */     localtablescan_input_0 = inputs[0];
/* 021 */
/* 022 */     bhj_relation_0 = ((org.apache.spark.sql.execution.joins.LongHashedRelation) ((org.apache.spark.broadcast.TorrentBroadcast) references[1] /* broadcast */).value()).asReadOnlyCopy();
/* 023 */     incPeakExecutionMemory(bhj_relation_0.estimatedSize());
/* 024 */
/* 025 */     bhj_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(4, 64);
/* 026 */     bhj_mutableStateArray_0[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(3, 64);
/* 027 */
/* 028 */   }
...

Let's access the generated source code via WholeStageCodegenExec physical operator.

val aqe = op
import org.apache.spark.sql.execution.WholeStageCodegenExec
val wsce = aqe.executedPlan
  .collect { case op: WholeStageCodegenExec => op }
  .head
val (_, source) = wsce.doCodeGen
import org.apache.spark.sql.catalyst.expressions.codegen.CodeFormatter
val formattedCode = CodeFormatter.format(source)
scala> println(formattedCode)
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=1
/* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */   private Object[] references;
/* 008 */   private scala.collection.Iterator[] inputs;
/* 009 */   private scala.collection.Iterator localtablescan_input_0;
/* 010 */   private org.apache.spark.sql.execution.joins.LongHashedRelation bhj_relation_0;
/* 011 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] bhj_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[2];
/* 012 */
/* 013 */   public GeneratedIteratorForCodegenStage1(Object[] references) {
/* 014 */     this.references = references;
/* 015 */   }
...