OneSideHashJoiner¶
OneSideHashJoiner
manages join state of one side of a <
OneSideHashJoiner
is <
.OneSideHashJoiner and StreamingSymmetricHashJoinExec image::images/OneSideHashJoiner.png[align="center"]
StreamingSymmetricHashJoinExec
physical operator uses two OneSideHashJoiners
per side of the stream-stream join (<
OneSideHashJoiner
uses an <
NOTE: OneSideHashJoiner
is a Scala private internal class of <StreamingSymmetricHashJoinExec
properties.
Creating OneSideHashJoiner Instance¶
OneSideHashJoiner
takes the following to be created:
- [[joinSide]] JoinSide
- [[inputAttributes]] Input attributes (
Seq[Attribute]
) - [[joinKeys]] Join keys (
Seq[Expression]
) - [[inputIter]] Input rows (
Iterator[InternalRow]
) - [[preJoinFilterExpr]] Optional pre-join filter Catalyst expression
- [[postJoinFilter]] Post-join filter (
(InternalRow) => Boolean
) - <
>
OneSideHashJoiner
initializes the <
=== [[joinStateManager]] SymmetricHashJoinStateManager -- joinStateManager
Internal Property
[source, scala]¶
joinStateManager: SymmetricHashJoinStateManager¶
joinStateManager
is a SymmetricHashJoinStateManager that is created for a OneSideHashJoiner
(with the <
joinStateManager
is used when OneSideHashJoiner
is requested for the following:
-
<
> -
<
> -
<
> -
<
>
=== [[updatedStateRowsCount]] Number of Updated State Rows -- updatedStateRowsCount
Internal Counter
updatedStateRowsCount
is the number the join keys and associated rows that were persisted as a join state, i.e. how many times <
updatedStateRowsCount
is then used (via <
updatedStateRowsCount
is available via numUpdatedStateRows
method.
[[numUpdatedStateRows]] [source, scala]
numUpdatedStateRows: Long¶
NOTE: numUpdatedStateRows
is used exclusively when StreamingSymmetricHashJoinExec
physical operator is requested to <
=== [[stateWatermarkPredicate]] Optional Join State Watermark Predicate -- stateWatermarkPredicate
Internal Property
[source, scala]¶
stateWatermarkPredicate: Option[JoinStateWatermarkPredicate]¶
When <OneSideHashJoiner
is given a <
stateWatermarkPredicate
is used for the <OneSideHashJoiner
is requested to <
=== [[storeAndJoinWithOtherSide]] storeAndJoinWithOtherSide
Method
[source, scala]¶
storeAndJoinWithOtherSide( otherSideJoiner: OneSideHashJoiner)( generateJoinedRow: (InternalRow, InternalRow) => JoinedRow): Iterator[InternalRow]
storeAndJoinWithOtherSide
tries to find the watermark attribute among the input attributes.
storeAndJoinWithOtherSide
creates a watermark expression (for the watermark attribute and the current event-time watermark).
[[storeAndJoinWithOtherSide-nonLateRows]] With the watermark attribute found, storeAndJoinWithOtherSide
generates a new predicate for the watermark expression and the <
[[storeAndJoinWithOtherSide-nonLateRows-flatMap]] For every <storeAndJoinWithOtherSide
applies the <
NOTE: storeAndJoinWithOtherSide
is used when StreamingSymmetricHashJoinExec
physical operator is requested to <
==== [[preJoinFilter-true]] preJoinFilter
Predicate Positive (true
)
When the <storeAndJoinWithOtherSide
extracts the join key (using the <OneSideHashJoiner
(otherSideJoiner
) for the <generateJoinedRow
function and then filtered by the <
storeAndJoinWithOtherSide
uses the <storeAndJoinWithOtherSide
increments the <
==== [[preJoinFilter-false]] preJoinFilter
Predicate Negative (false
)
When the <storeAndJoinWithOtherSide
creates a new Iterator[InternalRow]
of joined rows per <
-
For LeftSide and
LeftOuter
, the join row is the current row with the values of the right side allnull
(nullRight
) -
For RightSide and
RightOuter
, the join row is the current row with the values of the left side allnull
(nullLeft
) -
For all other combinations, the iterator is simply empty (that will be removed from the output by the outer <
>).
=== [[removeOldState]] Removing Old State -- removeOldState
Method
[source, scala]¶
removeOldState(): Iterator[UnsafeRowPair]¶
removeOldState
branches off per the <
-
For <
>, removeOldState
requests the <> to removeByKeyCondition (with the < >) -
For <
>, removeOldState
requests the <> to removeByValueCondition (with the < >) -
For any other predicates,
removeOldState
returns an empty iterator (no rows to process)
NOTE: removeOldState
is used exclusively when StreamingSymmetricHashJoinExec
physical operator is requested to <
=== [[get]] Retrieving Value Rows For Key -- get
Method
[source, scala]¶
get(key: UnsafeRow): Iterator[UnsafeRow]¶
get
simply requests the <
NOTE: get
is used exclusively when StreamingSymmetricHashJoinExec
physical operator is requested to <
=== [[commitStateAndGetMetrics]] Committing State (Changes) and Requesting Performance Metrics -- commitStateAndGetMetrics
Method
[source, scala]¶
commitStateAndGetMetrics(): StateStoreMetrics¶
commitStateAndGetMetrics
simply requests the <
commitStateAndGetMetrics
is used when StreamingSymmetricHashJoinExec
physical operator is requested to <
Internal Properties¶
[cols="30m,70",options="header",width="100%"] |=== | Name | Description
| keyGenerator a| [[keyGenerator]]
[source, scala]¶
keyGenerator: UnsafeProjection¶
Function to project (extract) join keys from an input row
Used when...FIXME
| preJoinFilter a| [[preJoinFilter]]
[source, scala]¶
preJoinFilter: InternalRow => Boolean¶
Used when...FIXME
| stateKeyWatermarkPredicateFunc a| [[stateKeyWatermarkPredicateFunc]]
[source, scala]¶
stateKeyWatermarkPredicateFunc: InternalRow => Boolean¶
Predicate for late rows based on the <
Used for the following:
-
<
> (and check out whether to append a row to the SymmetricHashJoinStateManager) -
<
>
| stateValueWatermarkPredicateFunc a| [[stateValueWatermarkPredicateFunc]]
[source, scala]¶
stateValueWatermarkPredicateFunc: InternalRow => Boolean¶
Predicate for late rows based on the <
Used for the following:
-
<
> (and check out whether to append a row to the < >) -
<
>
|===