InputProcessor¶
InputProcessor
is a helper class that is used to update state (in the state store) of a single partition of a FlatMapGroupsWithStateExec physical operator.
Creating Instance¶
InputProcessor
takes the following to be created:
InputProcessor
is created when FlatMapGroupsWithStateExec physical operator is executed (for storeUpdateFunction while processing rows per partition with a corresponding per-partition state store).
StateStore¶
InputProcessor
is given a StateStore when created.
The StateStore
manages the per-group state (and is used when processing new data and timed-out state data, and in the "all rows processed" callback).
Processing New Data¶
processNewData(
dataIter: Iterator[InternalRow]): Iterator[InternalRow]
processNewData
creates a grouped iterator of (of pairs of) per-group state keys and the row values from the given data iterator (dataIter
) with the grouping attributes and the output schema of the child operator (of the parent FlatMapGroupsWithStateExec
physical operator).
For every per-group state key (in the grouped iterator), processNewData
requests the StateManager (of the parent FlatMapGroupsWithStateExec
physical operator) to get the state (from the StateStore) and callFunctionAndUpdateState (with the hasTimedOut
flag off).
processNewData
is used when FlatMapGroupsWithStateExec physical operator is executed.
Processing Timed-Out State Data¶
processTimedOutState(): Iterator[InternalRow]
processTimedOutState
does nothing and simply returns an empty iterator for GroupStateTimeout.NoTimeout.
With timeout enabled, processTimedOutState
gets the current timeout threshold per GroupStateTimeout:
processTimedOutState
creates an iterator of timed-out state data by requesting the StateManager for all the available state data (in the StateStore) and takes only the state data with timeout defined and below the current timeout threshold.
In the end, for every timed-out state data, processTimedOutState
callFunctionAndUpdateState (with the hasTimedOut
flag on).
processTimedOutState
is used when FlatMapGroupsWithStateExec physical operator is executed.
callFunctionAndUpdateState Internal Method¶
callFunctionAndUpdateState(
stateData: StateData,
valueRowIter: Iterator[InternalRow],
hasTimedOut: Boolean): Iterator[InternalRow]
callFunctionAndUpdateState
is used when InputProcessor
is requested to process new data and timed-out state data with the given hasTimedOut
flag is off and on, respectively.
callFunctionAndUpdateState
creates a key object by requesting the given StateData
for the UnsafeRow
of the key (keyRow) and converts it to an object (using the internal state key converter).
callFunctionAndUpdateState
creates value objects by taking every value row (from the given valueRowIter
iterator) and converts them to objects (using the internal state value converter).
callFunctionAndUpdateState
creates a new GroupStateImpl with the following:
-
The current state value (of the given
StateData
) that could possibly benull
-
The batchTimestampMs of the parent
FlatMapGroupsWithStateExec
operator (that could possibly be -1) -
The event-time watermark of the parent
FlatMapGroupsWithStateExec
operator (that could possibly be -1) -
The GroupStateTimeout of the parent
FlatMapGroupsWithStateExec
operator -
The watermarkPresent flag of the parent
FlatMapGroupsWithStateExec
operator -
The given
hasTimedOut
flag
callFunctionAndUpdateState
then executes the user-defined state function (of the parent FlatMapGroupsWithStateExec
operator) on the key object, value objects, and the newly-created GroupStateImpl
.
For every output value from the user-defined state function, callFunctionAndUpdateState
updates numOutputRows performance metric and wraps the values to an internal row (using the internal output value converter).
In the end, callFunctionAndUpdateState
returns a Iterator[InternalRow]
which calls the completion function right after rows have been processed (so the iterator is considered fully consumed).
"All Rows Processed" Callback¶
onIteratorCompletion: Unit
onIteratorCompletion
branches off per whether the GroupStateImpl
has been marked removed and no timeout timestamp is specified or not.
When the GroupStateImpl
has been marked removed and no timeout timestamp is specified, onIteratorCompletion
does the following:
. Requests the StateManager (of the parent FlatMapGroupsWithStateExec
operator) to remove the state (from the StateStore for the key row of the given StateData
)
. Increments the numUpdatedStateRows performance metric
Otherwise, when the GroupStateImpl
has not been marked removed or the timeout timestamp is specified, onIteratorCompletion
checks whether the timeout timestamp has changed by comparing the timeout timestamps of the GroupStateImpl and the given StateData
.
(only when the GroupStateImpl
has been updated, removed or the timeout timestamp changed) onIteratorCompletion
does the following:
. Requests the StateManager (of the parent FlatMapGroupsWithStateExec
operator) to persist the state (in the StateStore with the key row, updated state object, and the timeout timestamp of the given StateData
)
. Increments the numUpdatedStateRows performance metrics
onIteratorCompletion
is used when InputProcessor
is requested to callFunctionAndUpdateState (right after rows have been processed)
Converters¶
Output Value Converter¶
An output value converter (of type Any => InternalRow
) to wrap a given output value (from the user-defined state function) to a row
- The data type of the row is specified as the data type of the output object attribute when the parent
FlatMapGroupsWithStateExec
operator is created
Used when InputProcessor
is requested to callFunctionAndUpdateState.
State Key Converter¶
A state key converter (of type InternalRow => Any
) to deserialize a given row (for a per-group state key) to the current state value
-
The deserialization expression for keys is specified as the key deserializer expression when the parent
FlatMapGroupsWithStateExec
operator is created -
The data type of state keys is specified as the grouping attributes when the parent
FlatMapGroupsWithStateExec
operator is created
Used when InputProcessor
is requested to callFunctionAndUpdateState.
State Value Converter¶
A state value converter (of type InternalRow => Any
) to deserialize a given row (for a per-group state value) to a Scala value
-
The deserialization expression for values is specified as the value deserializer expression when the parent
FlatMapGroupsWithStateExec
operator is created -
The data type of state values is specified as the data attributes when the parent
FlatMapGroupsWithStateExec
operator is created
Used when InputProcessor
is requested to callFunctionAndUpdateState.