StreamingSymmetricHashJoinExec Physical Operator¶
StreamingSymmetricHashJoinExec
is a binary physical operator (Spark SQL) for executing stream-stream equi-join.
Creating Instance¶
StreamingSymmetricHashJoinExec
takes the following to be created:
- Left Keys
- Right Keys
- JoinType
-
JoinConditionSplitPredicates
- StatefulOperatorStateInfo
- Event-Time Watermark
- JoinStateWatermarkPredicates
- State Format Version
- Left Child Physical Operator
- Right Child Physical Operator
StreamingSymmetricHashJoinExec
is created when:
- StreamingJoinStrategy execution planning strategy is executed (to plan an equi-join
Join
logical operator of two streaming logical query plans)
Equi-Join
An Equi-Join is a join with a join condition containing the following equality operators:
Join Type¶
StreamingSymmetricHashJoinExec
is given a JoinType
(Spark SQL) that can be one of the following:
FullOuter
Inner
LeftOuter
LeftSemi
RightOuter
JoinStateWatermarkPredicates¶
StreamingSymmetricHashJoinExec
is given a JoinStateWatermarkPredicates when created.
JoinStateWatermarkPredicates
is empty (i.e., undefined JoinStateWatermarkPredicates) when StreamingJoinStrategy execution planning strategy is executed (on a stream-stream equi-join).
JoinStateWatermarkPredicates
can be assigned when IncrementalExecution
is requested for the preparations rules to produce an executedPlan
query plan (using StreamingSymmetricHashJoinHelper).
StateStoreWriter¶
StreamingSymmetricHashJoinExec
is a StateStoreWriter.
Executing Operator¶
doExecute(): RDD[InternalRow]
doExecute
is part of the SparkPlan
(Spark SQL) abstraction.
doExecute
requests the left child physical operator to execute (that produces an RDD[InternalRow]
) that is then stateStoreAwareZipPartitions with the following:
- right child physical operator's
RDD[InternalRow]
- StatefulOperatorStateInfo (that at this point is supposed to be defined)
- Names of the state stores of the left and right side of the join
- StateStoreCoordinator endpoint
- processPartitions function
Computing Partition¶
processPartitions(
partitionId: Int,
leftInputIter: Iterator[InternalRow],
rightInputIter: Iterator[InternalRow]): Iterator[InternalRow]
processPartitions
processPartitions
is used as the function to compute a partition of StateStoreAwareZipPartitionsRDD.
processPartitions
uses the following performance metrics:
- numOutputRows
- numUpdatedStateRows
- number of total state rows
- time to update
- numRemovedStateRows
- time to remove
- time to commit changes
- stateMemory
processPartitions
creates the following:
postJoinFilter
predicate- OneSideHashJoiners for the left and right side of the join
processPartitions
requests the OneSideHashJoiner
of the left side to storeAndJoinWithOtherSide with the one of the right side.
processPartitions
does the opposite with the right side (i.e., requests the OneSideHashJoiner
of the right side to storeAndJoinWithOtherSide with the one of the left side).
Note
Executing storeAndJoinWithOtherSide updates an internal JoinedRow
variable.
processPartitions
creates a CompletionIterator
as a composition of all the joined rows (iterators) from the left side first followed by the right side.
processPartitions
creates an output iterator with the joined rows based on JoinType:
- For
Inner
orLeftSemi
join types, there are no changes to theCompletionIterator
- For...FIXME
processPartitions
updates the numOutputRows metric with the number of joined rows in the output iterator.
In the end, processPartitions
updates the metrics:
- allRemovalsTimeMs to be the time of removeOldState on the left and/or right sides of the join (based on the JoinType)
- others
processPartitions
throws an IllegalStateException
when the StatefulOperatorStateInfo has not been defined:
Cannot execute join as state info was not specified
[this]
Short Name¶
shortName: String
shortName
is part of the StateStoreWriter abstraction.
shortName
is the following text:
symmetricHashJoin
Required Child Output Distribution¶
requiredChildDistribution: Seq[Distribution]
requiredChildDistribution
is part of the SparkPlan
(Spark SQL) abstraction.
requiredChildDistribution
is two StatefulOpClusteredDistributions for the left and right keys (with the numPartitions of the StatefulOperatorStateInfo).
Should Run Another Non-Data Batch¶
shouldRunAnotherBatch(
newMetadata: OffsetSeqMetadata): Boolean
shouldRunAnotherBatch
is part of the StateStoreWriter abstraction.
shouldRunAnotherBatch
is positive (true
) when all of the following are positive:
-
Either the left or right join state watermark predicate is defined (in the JoinStateWatermarkPredicates)
-
This
StreamingSymmetricHashJoinExec
operator has event-time watermark defined and the current event-time watermark threshold of the givenOffsetSeqMetadata
is above (greater than) it, i.e. moved above
shouldRunAnotherBatch
is negative (false
) otherwise.
Performance Metrics¶
StreamingSymmetricHashJoinExec
uses the same performance metrics as the other stateful physical operators that write to a state store.