ProcessorStateManager¶
ProcessorStateManager
is a StateManager.
Creating Instance¶
ProcessorStateManager
takes the following to be created:
- TaskId
-
TaskType
- eosEnabled flag
-
LogContext
- StateDirectory
-
ChangelogRegister
-
storeToChangelogTopic
collection - Source
TopicPartition
s
ProcessorStateManager
is created when:
ActiveTaskCreator
is requested to createTasksStandbyTaskCreator
is requested to createTasksTopologyTestDriver
is requested to setupTask
eosEnabled Flag¶
ProcessorStateManager
is given eosEnabled
flag when created.
Offset Checkpoint File¶
When created, ProcessorStateManager
requests the given StateDirectory for a checkpoint file for the given TaskId and creates a new OffsetCheckpoint
.
ProcessorStateManager
uses the OffsetCheckpoint
for the following:
- initializeStoreOffsetsFromCheckpoint (to read offsets and then delete it with eosEnabled)
- Checkpoint
- deleteCheckPointFileIfEOSEnabled
Flushing State Stores¶
void flush()
flush
does nothing (noop) when there are no state stores registered.
flush
prints out the following DEBUG message to the logs:
Flushing all stores registered in the state manager: [stores]
For every state store, flush
prints out the following TRACE message to the logs and requests the StateStore
to flush cached data:
Flushing store [name]
flush
is part of the StateManager abstraction.
Flushing Store Caches¶
void flushCache()
flushCache
...FIXME
flushCache
is used when:
StreamTask
is requested to prepareCommit
Checkpointing¶
void checkpoint()
checkpoint
finds all the persistent state stores (in the stores registry) that are logged (with a changelogPartition
) and are not corrupted. For every state store, checkpoint
records the changelogPartition
and the offset (in a local checkpointingOffsets
collection).
checkpoint
prints out the following DEBUG message to the logs:
Writing checkpoint: [checkpointingOffsets]
checkpoint
requests the OffsetCheckpoint file to write out the offsets.
In case of any IO exceptions, checkpoint
prints out the following WARN message to the logs:
Failed to write offset checkpoint file to [checkpointFile].
This may occur if OS cleaned the state.dir in case when it located in ${java.io.tmpdir} directory.
This may also occur due to running multiple instances on the same machine using the same state dir.
Changing the location of state.dir may resolve the problem.
checkpoint
is part of the StateManager abstraction.
registerStore¶
void registerStore(
StateStore store,
StateRestoreCallback stateRestoreCallback)
registerStore
...FIXME
registerStore
is part of the StateManager abstraction.
registerStateStores¶
void registerStateStores(
List<StateStore> allStores,
InternalProcessorContext processorContext)
registerStateStores
...FIXME
registerStateStores
is used when:
StateManagerUtil
is requested to registerStateStores
maybeRegisterStoreWithChangelogReader¶
void maybeRegisterStoreWithChangelogReader(
String storeName)
maybeRegisterStoreWithChangelogReader
...FIXME
maybeRegisterStoreWithChangelogReader
is used when:
ProcessorStateManager
is requested to registerStateStores and registerStore
getStorePartition¶
TopicPartition getStorePartition(
String storeName)
getStorePartition
creates a TopicPartition
with the following:
- changelogFor with the given
storeName
for the name of the (changelog) topic - The partition of the TaskId for the partition (of the changelog topic)
getStorePartition
is used when:
ProcessorStateManager
is requested to maybeRegisterStoreWithChangelogReader and registerStore
initializeStoreOffsetsFromCheckpoint¶
void initializeStoreOffsetsFromCheckpoint(
boolean storeDirIsEmpty)
initializeStoreOffsetsFromCheckpoint
...FIXME
initializeStoreOffsetsFromCheckpoint
is used when:
StateManagerUtil
is requested to registerStateStores
Logging¶
Enable ALL
logging level for org.apache.kafka.streams.processor.internals.ProcessorStateManager
logger to see what happens inside.
Add the following line to log4j.properties
:
log4j.logger.org.apache.kafka.streams.processor.internals.ProcessorStateManager=ALL
Refer to Logging.