Skip to content

StreamingSymmetricHashJoinExec Physical Operator

StreamingSymmetricHashJoinExec is a binary physical operator (Spark SQL) for executing stream-stream equi-join.

Creating Instance

StreamingSymmetricHashJoinExec takes the following to be created:

StreamingSymmetricHashJoinExec is created when:

  • StreamingJoinStrategy execution planning strategy is executed (to plan an equi-join Join logical operator of two streaming logical query plans)

Equi-Join

An Equi-Join is a join with a join condition containing the following equality operators:

Join Type

StreamingSymmetricHashJoinExec is given a JoinType (Spark SQL) that can be one of the following:

  • FullOuter
  • Inner
  • LeftOuter
  • LeftSemi
  • RightOuter

JoinStateWatermarkPredicates

StreamingSymmetricHashJoinExec is given a JoinStateWatermarkPredicates when created.

JoinStateWatermarkPredicates is empty (i.e., undefined JoinStateWatermarkPredicates) when StreamingJoinStrategy execution planning strategy is executed (on a stream-stream equi-join).

JoinStateWatermarkPredicates can be assigned when IncrementalExecution is requested for the preparations rules to produce an executedPlan query plan (using StreamingSymmetricHashJoinHelper).

StateStoreWriter

StreamingSymmetricHashJoinExec is a StateStoreWriter.

Executing Operator

doExecute(): RDD[InternalRow]

doExecute is part of the SparkPlan (Spark SQL) abstraction.


doExecute requests the left child physical operator to execute (that produces an RDD[InternalRow]) that is then stateStoreAwareZipPartitions with the following:

Computing Partition

processPartitions(
  partitionId: Int,
  leftInputIter: Iterator[InternalRow],
  rightInputIter: Iterator[InternalRow]): Iterator[InternalRow]

processPartitions

processPartitions is used as the function to compute a partition of StateStoreAwareZipPartitionsRDD.

processPartitions uses the following performance metrics:

processPartitions creates the following:

processPartitions requests the OneSideHashJoiner of the left side to storeAndJoinWithOtherSide with the one of the right side.

processPartitions does the opposite with the right side (i.e., requests the OneSideHashJoiner of the right side to storeAndJoinWithOtherSide with the one of the left side).

Note

Executing storeAndJoinWithOtherSide updates an internal JoinedRow variable.

processPartitions creates a CompletionIterator as a composition of all the joined rows (iterators) from the left side first followed by the right side.

processPartitions creates an output iterator with the joined rows based on JoinType:

  • For Inner or LeftSemi join types, there are no changes to the CompletionIterator
  • For...FIXME

processPartitions updates the numOutputRows metric with the number of joined rows in the output iterator.

In the end, processPartitions updates the metrics:


processPartitions throws an IllegalStateException when the StatefulOperatorStateInfo has not been defined:

Cannot execute join as state info was not specified
[this]

Short Name

shortName: String

shortName is part of the StateStoreWriter abstraction.


shortName is the following text:

symmetricHashJoin

Required Child Output Distribution

requiredChildDistribution: Seq[Distribution]

requiredChildDistribution is part of the SparkPlan (Spark SQL) abstraction.


requiredChildDistribution is two StatefulOpClusteredDistributions for the left and right keys (with the numPartitions of the StatefulOperatorStateInfo).

Should Run Another Non-Data Batch

shouldRunAnotherBatch(
  newMetadata: OffsetSeqMetadata): Boolean

shouldRunAnotherBatch is part of the StateStoreWriter abstraction.


shouldRunAnotherBatch is positive (true) when all of the following are positive:

shouldRunAnotherBatch is negative (false) otherwise.

Performance Metrics

StreamingSymmetricHashJoinExec uses the same performance metrics as the other stateful physical operators that write to a state store.

StreamingSymmetricHashJoinExec in web UI (Details for Query)