Skip to content


ProcessorStateManager is a StateManager.

Creating Instance

ProcessorStateManager takes the following to be created:

ProcessorStateManager is created when:

eosEnabled Flag

ProcessorStateManager is given eosEnabled flag when created.

Offset Checkpoint File

When created, ProcessorStateManager requests the given StateDirectory for a checkpoint file for the given TaskId and creates a new OffsetCheckpoint.

ProcessorStateManager uses the OffsetCheckpoint for the following:

Flushing State Stores

void flush()

flush does nothing (noop) when there are no state stores registered.

flush prints out the following DEBUG message to the logs:

Flushing all stores registered in the state manager: [stores]

For every state store, flush prints out the following TRACE message to the logs and requests the StateStore to flush cached data:

Flushing store [name]

flush is part of the StateManager abstraction.

Flushing Store Caches

void flushCache()


flushCache is used when:


void checkpoint()

checkpoint finds all the persistent state stores (in the stores registry) that are logged (with a changelogPartition) and are not corrupted. For every state store, checkpoint records the changelogPartition and the offset (in a local checkpointingOffsets collection).

checkpoint prints out the following DEBUG message to the logs:

Writing checkpoint: [checkpointingOffsets]

checkpoint requests the OffsetCheckpoint file to write out the offsets.

In case of any IO exceptions, checkpoint prints out the following WARN message to the logs:

Failed to write offset checkpoint file to [checkpointFile].
This may occur if OS cleaned the state.dir in case when it located in ${} directory.
This may also occur due to running multiple instances on the same machine using the same state dir.
Changing the location of state.dir may resolve the problem.

checkpoint is part of the StateManager abstraction.


void registerStore(
  StateStore store,
  StateRestoreCallback stateRestoreCallback)


registerStore is part of the StateManager abstraction.


void registerStateStores(
  List<StateStore> allStores, 
  InternalProcessorContext processorContext)


registerStateStores is used when:


void maybeRegisterStoreWithChangelogReader(
  String storeName)


maybeRegisterStoreWithChangelogReader is used when:


TopicPartition getStorePartition(
  String storeName)

getStorePartition creates a TopicPartition with the following:

  • changelogFor with the given storeName for the name of the (changelog) topic
  • The partition of the TaskId for the partition (of the changelog topic)

getStorePartition is used when:


void initializeStoreOffsetsFromCheckpoint(
  boolean storeDirIsEmpty)


initializeStoreOffsetsFromCheckpoint is used when:


Enable ALL logging level for org.apache.kafka.streams.processor.internals.ProcessorStateManager logger to see what happens inside.

Add the following line to

Refer to Logging.

Back to top