Skip to content

SymmetricHashJoinStateManager

SymmetricHashJoinStateManager is used for the left and right OneSideHashJoiners of a StreamingSymmetricHashJoinExec physical operator (one for each side when StreamingSymmetricHashJoinExec is requested to process partitions of the left and right sides of a stream-stream join).

SymmetricHashJoinStateManager is created and used exclusively by OneSideHashJoiner (and reflects the arguments of the owning OneSideHashJoiner, e.g., joinSide, inputValueAttributes, joinKeys, stateInfo, partitionId).

SymmetricHashJoinStateManager manages join state using the KeyToNumValuesStore and KeyWithIndexToValueStore state store handlers (and acts like their facade).

SymmetricHashJoinStateManager and Stream-Stream Join

Creating Instance

SymmetricHashJoinStateManager takes the following to be created:

SymmetricHashJoinStateManager is created when:

JoinSide

SymmetricHashJoinStateManager is given a JoinSide marker when created that indicates the join side (of the parent OneSideHashJoiner).

JoinSide Alias
LeftSide left
RightSide right

KeyToNumValuesStore

SymmetricHashJoinStateManager creates a KeyToNumValuesStore when created.

KeyWithIndexToValueStore

SymmetricHashJoinStateManager creates a KeyWithIndexToValueStore (for the stateFormatVersion) when created.

getJoinedRows

getJoinedRows(
  key: UnsafeRow,
  generateJoinedRow: InternalRow => JoinedRow,
  predicate: JoinedRow => Boolean,
  excludeRowsAlreadyMatched: Boolean = false): Iterator[JoinedRow]

getJoinedRows...FIXME


getJoinedRows is used when:

append

append(
  key: UnsafeRow,
  value: UnsafeRow,
  matched: Boolean): Unit

append...FIXME


append is used when:

removeByKeyCondition

removeByKeyCondition(
  removalCondition: UnsafeRow => Boolean): Iterator[KeyToValuePair]

removeByKeyCondition...FIXME


removeByKeyCondition is used when:

  • OneSideHashJoiner is requested to removeOldState (for JoinStateKeyWatermarkPredicate)

removeByValueCondition

removeByValueCondition(
  removalCondition: UnsafeRow => Boolean): Iterator[KeyToValuePair]

removeByValueCondition...FIXME


removeByValueCondition is used when:

  • OneSideHashJoiner is requested to removeOldState (for JoinStateValueWatermarkPredicate)

get

get(
  key: UnsafeRow): Iterator[UnsafeRow]

get...FIXME


get is used when:

  • OneSideHashJoiner is requested to get

Committing (State) Changes

commit(): Unit

commit requests the keyToNumValues and keyWithIndexToValue to commit.


commit is used when:

Performance Metrics

metrics: StateStoreMetrics

metrics requests the keyToNumValues and keyWithIndexToValue for the metrics.

metrics creates a StateStoreMetrics:

Metric Value
Number of Keys Number of Keys of the keyWithIndexToValue
Memory used (in bytes) Total of the Memory used (in bytes) of the keyToNumValues and keyWithIndexToValue
Custom Metrics The description of all the Custom Metrics of the keyWithIndexToValue prefixed with the JoinSide

metrics is used when: