SymmetricHashJoinStateManager¶
SymmetricHashJoinStateManager is used for the left and right OneSideHashJoiners of a StreamingSymmetricHashJoinExec physical operator (one for each side when StreamingSymmetricHashJoinExec is requested to process partitions of the left and right sides of a stream-stream join).
SymmetricHashJoinStateManager is created and used exclusively by OneSideHashJoiner (and reflects the arguments of the owning OneSideHashJoiner, e.g., joinSide, inputValueAttributes, joinKeys, stateInfo, partitionId).
SymmetricHashJoinStateManager manages join state using the KeyToNumValuesStore and KeyWithIndexToValueStore state store handlers (and acts like their facade).

Creating Instance¶
SymmetricHashJoinStateManager takes the following to be created:
- JoinSide
- Input Value
Attributes - Join Keys (
Seq[Expression]) - StatefulOperatorStateInfo
- StateStoreConf
- Hadoop Configuration
- Partition ID
- State Format Version
SymmetricHashJoinStateManager is created when:
OneSideHashJoineris created
JoinSide¶
SymmetricHashJoinStateManager is given a JoinSide marker when created that indicates the join side (of the parent OneSideHashJoiner).
| JoinSide | Alias |
|---|---|
LeftSide | left |
RightSide | right |
KeyToNumValuesStore¶
SymmetricHashJoinStateManager creates a KeyToNumValuesStore when created.
KeyWithIndexToValueStore¶
SymmetricHashJoinStateManager creates a KeyWithIndexToValueStore (for the stateFormatVersion) when created.
getJoinedRows¶
getJoinedRows(
key: UnsafeRow,
generateJoinedRow: InternalRow => JoinedRow,
predicate: JoinedRow => Boolean,
excludeRowsAlreadyMatched: Boolean = false): Iterator[JoinedRow]
getJoinedRows...FIXME
getJoinedRows is used when:
OneSideHashJoineris requested to storeAndJoinWithOtherSide
append¶
append(
key: UnsafeRow,
value: UnsafeRow,
matched: Boolean): Unit
append...FIXME
append is used when:
OneSideHashJoineris requested to storeAndJoinWithOtherSide
removeByKeyCondition¶
removeByKeyCondition(
removalCondition: UnsafeRow => Boolean): Iterator[KeyToValuePair]
removeByKeyCondition...FIXME
removeByKeyCondition is used when:
OneSideHashJoineris requested to removeOldState (forJoinStateKeyWatermarkPredicate)
removeByValueCondition¶
removeByValueCondition(
removalCondition: UnsafeRow => Boolean): Iterator[KeyToValuePair]
removeByValueCondition...FIXME
removeByValueCondition is used when:
OneSideHashJoineris requested to removeOldState (forJoinStateValueWatermarkPredicate)
get¶
get(
key: UnsafeRow): Iterator[UnsafeRow]
get...FIXME
get is used when:
OneSideHashJoineris requested to get
Committing (State) Changes¶
commit(): Unit
commit requests the keyToNumValues and keyWithIndexToValue to commit.
commit is used when:
OneSideHashJoineris requested to commitStateAndGetMetrics
Performance Metrics¶
metrics: StateStoreMetrics
metrics requests the keyToNumValues and keyWithIndexToValue for the metrics.
metrics creates a StateStoreMetrics:
| Metric | Value |
|---|---|
| Number of Keys | Number of Keys of the keyWithIndexToValue |
| Memory used (in bytes) | Total of the Memory used (in bytes) of the keyToNumValues and keyWithIndexToValue |
| Custom Metrics | The description of all the Custom Metrics of the keyWithIndexToValue prefixed with the JoinSide |
metrics is used when:
OneSideHashJoineris requested to commitStateAndGetMetrics