ExtractEquiJoinKeys Scala Extractor¶
ExtractEquiJoinKeys
is a Scala extractor to destructure a Join logical operator into a tuple of the following elements:
-
Left and right keys (for non-empty join keys in the condition of the
Join
operator) -
Optional join condition (a Catalyst expression) that could be used as a new join condition
-
JoinHint
unapply
gives None
(aka nothing) when no join keys were found or the logical plan is not a Join logical operator.
Demo¶
val left = Seq((0, 1, "zero"), (1, 2, "one")).toDF("k1", "k2", "name")
val right = Seq((0, 0, "0"), (1, 1, "1")).toDF("k1", "k2", "name")
The following join query gives no data result but is enough for demo purposes.
val q = left
.join(right, Seq("k1", "k2", "name"))
.where(left("k1") > 3)
import org.apache.spark.sql.catalyst.plans.logical.Join
val plan = q.queryExecution.analyzed
val join = plan.collectFirst { case j: Join => j }.get
assert(join.condition.isDefined)
Enable DEBUG logging level
import org.apache.log4j.{Level, Logger}
Logger.getLogger("org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys").setLevel(Level.DEBUG)
import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
val joinParts = ExtractEquiJoinKeys.unapply(join)
21/05/03 19:16:07 DEBUG ExtractEquiJoinKeys: Considering join on: Some((((k1#10 = k1#39) AND (k2#11 = k2#40)) AND (name#12 = name#41)))
21/05/03 19:16:07 DEBUG ExtractEquiJoinKeys: leftKeys:List(k1#10, k2#11, name#12) | rightKeys:List(k1#39, k2#40, name#41)
joinParts: Option[org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys.ReturnType] =
Some((Inner,List(k1#10, k2#11, name#12),List(k1#39, k2#40, name#41),None,Project [_1#3 AS k1#10, _2#4 AS k2#11, _3#5 AS name#12]
+- LocalRelation [_1#3, _2#4, _3#5]
,Project [_1#32 AS k1#39, _2#33 AS k2#40, _3#34 AS name#41]
+- LocalRelation [_1#32, _2#33, _3#34]
,))
Destructuring Join Logical Plan¶
type ReturnType =
(JoinType,
Seq[Expression],
Seq[Expression],
Option[Expression],
LogicalPlan,
LogicalPlan,
JoinHint)
unapply(
join: Join): Option[ReturnType]
unapply
prints out the following DEBUG message to the logs:
Considering join on: [condition]
unapply
then splits condition
at And
expression points (if there are any) to have a list of predicate expressions.
unapply
finds EqualTo and EqualNullSafe binary predicates to collect the join keys (for the left and right side).
unapply
takes the expressions that...FIXME...to build otherPredicates
.
In the end, unapply
splits the pairs of join keys into collections of left and right join keys. unapply
prints out the following DEBUG message to the logs:
leftKeys:[leftKeys] | rightKeys:[rightKeys]
unapply
is used when:
JoinEstimation
is requested to estimateInnerOuterJoin- JoinSelection and LogicalQueryStageStrategy execution planning strategies are executed
NormalizeFloatingNumbers
and PartitionPruning logical optimizations are executed
Logging¶
Enable ALL
logging level for org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
logger to see what happens inside.
Add the following line to conf/log4j2.properties
:
log4j.logger.org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys=ALL
Refer to Logging.