Skip to content

InputProcessor

InputProcessor is a helper class that is used to update state (in the state store) of a single partition of a FlatMapGroupsWithStateExec physical operator.

Creating Instance

InputProcessor takes the following to be created:

InputProcessor is created when FlatMapGroupsWithStateExec physical operator is executed (for storeUpdateFunction while processing rows per partition with a corresponding per-partition state store).

StateStore

InputProcessor is given a StateStore when created.

The StateStore manages the per-group state (and is used when processing new data and timed-out state data, and in the "all rows processed" callback).

Processing New Data

processNewData(
  dataIter: Iterator[InternalRow]): Iterator[InternalRow]

processNewData creates a grouped iterator of (of pairs of) per-group state keys and the row values from the given data iterator (dataIter) with the grouping attributes and the output schema of the child operator (of the parent FlatMapGroupsWithStateExec physical operator).

For every per-group state key (in the grouped iterator), processNewData requests the StateManager (of the parent FlatMapGroupsWithStateExec physical operator) to get the state (from the StateStore) and callFunctionAndUpdateState (with the hasTimedOut flag off).

processNewData is used when FlatMapGroupsWithStateExec physical operator is executed.

Processing Timed-Out State Data

processTimedOutState(): Iterator[InternalRow]

processTimedOutState does nothing and simply returns an empty iterator for GroupStateTimeout.NoTimeout.

With timeout enabled, processTimedOutState gets the current timeout threshold per GroupStateTimeout:

processTimedOutState creates an iterator of timed-out state data by requesting the StateManager for all the available state data (in the StateStore) and takes only the state data with timeout defined and below the current timeout threshold.

In the end, for every timed-out state data, processTimedOutState callFunctionAndUpdateState (with the hasTimedOut flag on).

processTimedOutState is used when FlatMapGroupsWithStateExec physical operator is executed.

callFunctionAndUpdateState Internal Method

callFunctionAndUpdateState(
  stateData: StateData,
  valueRowIter: Iterator[InternalRow],
  hasTimedOut: Boolean): Iterator[InternalRow]

callFunctionAndUpdateState is used when InputProcessor is requested to process new data and timed-out state data with the given hasTimedOut flag is off and on, respectively.

callFunctionAndUpdateState creates a key object by requesting the given StateData for the UnsafeRow of the key (keyRow) and converts it to an object (using the internal state key converter).

callFunctionAndUpdateState creates value objects by taking every value row (from the given valueRowIter iterator) and converts them to objects (using the internal state value converter).

callFunctionAndUpdateState creates a new GroupStateImpl with the following:

  • The current state value (of the given StateData) that could possibly be null

  • The batchTimestampMs of the parent FlatMapGroupsWithStateExec operator (that could possibly be -1)

  • The event-time watermark of the parent FlatMapGroupsWithStateExec operator (that could possibly be -1)

  • The GroupStateTimeout of the parent FlatMapGroupsWithStateExec operator

  • The watermarkPresent flag of the parent FlatMapGroupsWithStateExec operator

  • The given hasTimedOut flag

callFunctionAndUpdateState then executes the user-defined state function (of the parent FlatMapGroupsWithStateExec operator) on the key object, value objects, and the newly-created GroupStateImpl.

For every output value from the user-defined state function, callFunctionAndUpdateState updates numOutputRows performance metric and wraps the values to an internal row (using the internal output value converter).

In the end, callFunctionAndUpdateState returns a Iterator[InternalRow] which calls the completion function right after rows have been processed (so the iterator is considered fully consumed).

"All Rows Processed" Callback

onIteratorCompletion: Unit

onIteratorCompletion branches off per whether the GroupStateImpl has been marked removed and no timeout timestamp is specified or not.

When the GroupStateImpl has been marked removed and no timeout timestamp is specified, onIteratorCompletion does the following:

. Requests the StateManager (of the parent FlatMapGroupsWithStateExec operator) to remove the state (from the StateStore for the key row of the given StateData)

. Increments the numUpdatedStateRows performance metric

Otherwise, when the GroupStateImpl has not been marked removed or the timeout timestamp is specified, onIteratorCompletion checks whether the timeout timestamp has changed by comparing the timeout timestamps of the GroupStateImpl and the given StateData.

(only when the GroupStateImpl has been updated, removed or the timeout timestamp changed) onIteratorCompletion does the following:

. Requests the StateManager (of the parent FlatMapGroupsWithStateExec operator) to persist the state (in the StateStore with the key row, updated state object, and the timeout timestamp of the given StateData)

. Increments the numUpdatedStateRows performance metrics

onIteratorCompletion is used when InputProcessor is requested to callFunctionAndUpdateState (right after rows have been processed)

Converters

Output Value Converter

An output value converter (of type Any => InternalRow) to wrap a given output value (from the user-defined state function) to a row

  • The data type of the row is specified as the data type of the output object attribute when the parent FlatMapGroupsWithStateExec operator is created

Used when InputProcessor is requested to callFunctionAndUpdateState.

State Key Converter

A state key converter (of type InternalRow => Any) to deserialize a given row (for a per-group state key) to the current state value

  • The deserialization expression for keys is specified as the key deserializer expression when the parent FlatMapGroupsWithStateExec operator is created

  • The data type of state keys is specified as the grouping attributes when the parent FlatMapGroupsWithStateExec operator is created

Used when InputProcessor is requested to callFunctionAndUpdateState.

State Value Converter

A state value converter (of type InternalRow => Any) to deserialize a given row (for a per-group state value) to a Scala value

  • The deserialization expression for values is specified as the value deserializer expression when the parent FlatMapGroupsWithStateExec operator is created

  • The data type of state values is specified as the data attributes when the parent FlatMapGroupsWithStateExec operator is created

Used when InputProcessor is requested to callFunctionAndUpdateState.