Skip to content

OneSideHashJoiner

OneSideHashJoiner is used to manage a join state of one side of a Stream-Stream Join (using SymmetricHashJoinStateManager).

OneSideHashJoiner is used by StreamingSymmetricHashJoinExec physical operator when requested to process partitions and creates two OneSideHashJoiners for the left and right side of the join.

There will be twice as many OneSideHashJoiners as there are join partitions, each with JoinStateWatermarkPredicate to manage join state.

OneSideHashJoiner and StreamingSymmetricHashJoinExec

OneSideHashJoiner can be given a JoinStateWatermarkPredicate to remove old state.

Scala private class

OneSideHashJoiner is a private class of StreamingSymmetricHashJoinExec physical operator (with full access to StreamingSymmetricHashJoinExec's properties).

Creating Instance

OneSideHashJoiner takes the following to be created:

  • JoinSide
  • Input Attributes
  • Join Keys (Expressions)
  • Input (Iterator of) InternalRows
  • Pre-Join Filter Expression
  • Post-Join Filter Function ((InternalRow) => Boolean)
  • JoinStateWatermarkPredicate
  • Partition ID

OneSideHashJoiner is created when:

  • StreamingSymmetricHashJoinExec physical operator is requested to processPartitions (and creates two OneSideHashJoiners for the left and right side of the join)

Join State Watermark Predicate

OneSideHashJoiner can be given a JoinStateWatermarkPredicate when created.

The JoinStateWatermarkPredicate is used to remove old state.

The JoinStateWatermarkPredicate is also used to create a stateKeyWatermarkPredicateFunc and stateValueWatermarkPredicateFunc.

SymmetricHashJoinStateManager

OneSideHashJoiner creates a SymmetricHashJoinStateManager when created.

Removing Old State

removeOldState(): Iterator[KeyToValuePair]

removeOldState requests the SymmetricHashJoinStateManager for the following (based on the optional JoinStateWatermarkPredicate):

For all other cases, removeOldState returns an empty iterator.


removeOldState is used when:

  • StreamingSymmetricHashJoinExec physical operator is executed

Looking Up Value Rows for Key

get(
  key: UnsafeRow): Iterator[UnsafeRow]

get requests the SymmetricHashJoinStateManager to retrieve value rows for the given key.


get is used when: