StreamingSymmetricHashJoinExec Physical Operator¶
StreamingSymmetricHashJoinExec is a binary physical operator (Spark SQL) for executing stream-stream equi-join.
Creating Instance¶
StreamingSymmetricHashJoinExec takes the following to be created:
- Left Keys
- Right Keys
- JoinType
-
JoinConditionSplitPredicates - StatefulOperatorStateInfo
- Event-Time Watermark
- JoinStateWatermarkPredicates
- State Format Version
- Left Child Physical Operator
- Right Child Physical Operator
StreamingSymmetricHashJoinExec is created when:
- StreamingJoinStrategy execution planning strategy is executed (to plan an equi-join
Joinlogical operator of two streaming logical query plans)
Equi-Join
An Equi-Join is a join with a join condition containing the following equality operators:
Join Type¶
StreamingSymmetricHashJoinExec is given a JoinType (Spark SQL) that can be one of the following:
FullOuterInnerLeftOuterLeftSemiRightOuter
JoinStateWatermarkPredicates¶
StreamingSymmetricHashJoinExec is given a JoinStateWatermarkPredicates when created.
JoinStateWatermarkPredicates is empty (i.e., undefined JoinStateWatermarkPredicates) when StreamingJoinStrategy execution planning strategy is executed (on a stream-stream equi-join).
JoinStateWatermarkPredicates can be assigned when IncrementalExecution is requested for the preparations rules to produce an executedPlan query plan (using StreamingSymmetricHashJoinHelper).
StateStoreWriter¶
StreamingSymmetricHashJoinExec is a StateStoreWriter.
Executing Operator¶
doExecute(): RDD[InternalRow]
doExecute is part of the SparkPlan (Spark SQL) abstraction.
doExecute requests the left child physical operator to execute (that produces an RDD[InternalRow]) that is then stateStoreAwareZipPartitions with the following:
- right child physical operator's
RDD[InternalRow] - StatefulOperatorStateInfo (that at this point is supposed to be defined)
- Names of the state stores of the left and right side of the join
- StateStoreCoordinator endpoint
- processPartitions function
Computing Partition¶
processPartitions(
partitionId: Int,
leftInputIter: Iterator[InternalRow],
rightInputIter: Iterator[InternalRow]): Iterator[InternalRow]
processPartitions
processPartitions is used as the function to compute a partition of StateStoreAwareZipPartitionsRDD.
processPartitions uses the following performance metrics:
- numOutputRows
- numUpdatedStateRows
- number of total state rows
- time to update
- numRemovedStateRows
- time to remove
- time to commit changes
- stateMemory
processPartitions creates the following:
postJoinFilterpredicate- OneSideHashJoiners for the left and right side of the join
processPartitions requests the OneSideHashJoiner of the left side to storeAndJoinWithOtherSide with the one of the right side.
processPartitions does the opposite with the right side (i.e., requests the OneSideHashJoiner of the right side to storeAndJoinWithOtherSide with the one of the left side).
Note
Executing storeAndJoinWithOtherSide updates an internal JoinedRow variable.
processPartitions creates a CompletionIterator as a composition of all the joined rows (iterators) from the left side first followed by the right side.
processPartitions creates an output iterator with the joined rows based on JoinType:
- For
InnerorLeftSemijoin types, there are no changes to theCompletionIterator - For...FIXME
processPartitions updates the numOutputRows metric with the number of joined rows in the output iterator.
In the end, processPartitions updates the metrics:
- allRemovalsTimeMs to be the time of removeOldState on the left and/or right sides of the join (based on the JoinType)
- others
processPartitions throws an IllegalStateException when the StatefulOperatorStateInfo has not been defined:
Cannot execute join as state info was not specified
[this]
Short Name¶
shortName: String
shortName is part of the StateStoreWriter abstraction.
shortName is the following text:
symmetricHashJoin
Required Child Output Distribution¶
requiredChildDistribution: Seq[Distribution]
requiredChildDistribution is part of the SparkPlan (Spark SQL) abstraction.
requiredChildDistribution is two StatefulOpClusteredDistributions for the left and right keys (with the numPartitions of the StatefulOperatorStateInfo).
Should Run Another Non-Data Batch¶
shouldRunAnotherBatch(
newMetadata: OffsetSeqMetadata): Boolean
shouldRunAnotherBatch is part of the StateStoreWriter abstraction.
shouldRunAnotherBatch is positive (true) when all of the following are positive:
-
Either the left or right join state watermark predicate is defined (in the JoinStateWatermarkPredicates)
-
This
StreamingSymmetricHashJoinExecoperator has event-time watermark defined and the current event-time watermark threshold of the givenOffsetSeqMetadatais above (greater than) it, i.e. moved above
shouldRunAnotherBatch is negative (false) otherwise.
Performance Metrics¶
StreamingSymmetricHashJoinExec uses the same performance metrics as the other stateful physical operators that write to a state store.
