ProcessorStateManager¶
ProcessorStateManager is a StateManager.
Creating Instance¶
ProcessorStateManager takes the following to be created:
- TaskId
-
TaskType - eosEnabled flag
-
LogContext - StateDirectory
-
ChangelogRegister -
storeToChangelogTopiccollection - Source
TopicPartitions
ProcessorStateManager is created when:
ActiveTaskCreatoris requested to createTasksStandbyTaskCreatoris requested to createTasksTopologyTestDriveris requested to setupTask
eosEnabled Flag¶
ProcessorStateManager is given eosEnabled flag when created.
Offset Checkpoint File¶
When created, ProcessorStateManager requests the given StateDirectory for a checkpoint file for the given TaskId and creates a new OffsetCheckpoint.
ProcessorStateManager uses the OffsetCheckpoint for the following:
- initializeStoreOffsetsFromCheckpoint (to read offsets and then delete it with eosEnabled)
- Checkpoint
- deleteCheckPointFileIfEOSEnabled
Flushing State Stores¶
void flush()
flush does nothing (noop) when there are no state stores registered.
flush prints out the following DEBUG message to the logs:
Flushing all stores registered in the state manager: [stores]
For every state store, flush prints out the following TRACE message to the logs and requests the StateStore to flush cached data:
Flushing store [name]
flush is part of the StateManager abstraction.
Flushing Store Caches¶
void flushCache()
flushCache...FIXME
flushCache is used when:
StreamTaskis requested to prepareCommit
Checkpointing¶
void checkpoint()
checkpoint finds all the persistent state stores (in the stores registry) that are logged (with a changelogPartition) and are not corrupted. For every state store, checkpoint records the changelogPartition and the offset (in a local checkpointingOffsets collection).
checkpoint prints out the following DEBUG message to the logs:
Writing checkpoint: [checkpointingOffsets]
checkpoint requests the OffsetCheckpoint file to write out the offsets.
In case of any IO exceptions, checkpoint prints out the following WARN message to the logs:
Failed to write offset checkpoint file to [checkpointFile].
This may occur if OS cleaned the state.dir in case when it located in ${java.io.tmpdir} directory.
This may also occur due to running multiple instances on the same machine using the same state dir.
Changing the location of state.dir may resolve the problem.
checkpoint is part of the StateManager abstraction.
registerStore¶
void registerStore(
StateStore store,
StateRestoreCallback stateRestoreCallback)
registerStore...FIXME
registerStore is part of the StateManager abstraction.
registerStateStores¶
void registerStateStores(
List<StateStore> allStores,
InternalProcessorContext processorContext)
registerStateStores...FIXME
registerStateStores is used when:
StateManagerUtilis requested to registerStateStores
maybeRegisterStoreWithChangelogReader¶
void maybeRegisterStoreWithChangelogReader(
String storeName)
maybeRegisterStoreWithChangelogReader...FIXME
maybeRegisterStoreWithChangelogReader is used when:
ProcessorStateManageris requested to registerStateStores and registerStore
getStorePartition¶
TopicPartition getStorePartition(
String storeName)
getStorePartition creates a TopicPartition with the following:
- changelogFor with the given
storeNamefor the name of the (changelog) topic - The partition of the TaskId for the partition (of the changelog topic)
getStorePartition is used when:
ProcessorStateManageris requested to maybeRegisterStoreWithChangelogReader and registerStore
initializeStoreOffsetsFromCheckpoint¶
void initializeStoreOffsetsFromCheckpoint(
boolean storeDirIsEmpty)
initializeStoreOffsetsFromCheckpoint...FIXME
initializeStoreOffsetsFromCheckpoint is used when:
StateManagerUtilis requested to registerStateStores
Logging¶
Enable ALL logging level for org.apache.kafka.streams.processor.internals.ProcessorStateManager logger to see what happens inside.
Add the following line to log4j.properties:
log4j.logger.org.apache.kafka.streams.processor.internals.ProcessorStateManager=ALL
Refer to Logging.