Skip to content

ProcessorStateManager

ProcessorStateManager is a StateManager.

Creating Instance

ProcessorStateManager takes the following to be created:

ProcessorStateManager is created when:

eosEnabled Flag

ProcessorStateManager is given eosEnabled flag when created.

Offset Checkpoint File

When created, ProcessorStateManager requests the given StateDirectory for a checkpoint file for the given TaskId and creates a new OffsetCheckpoint.

ProcessorStateManager uses the OffsetCheckpoint for the following:

Flushing State Stores

void flush()

flush does nothing (noop) when there are no state stores registered.


flush prints out the following DEBUG message to the logs:

Flushing all stores registered in the state manager: [stores]

For every state store, flush prints out the following TRACE message to the logs and requests the StateStore to flush cached data:

Flushing store [name]

flush is part of the StateManager abstraction.

Flushing Store Caches

void flushCache()

flushCache...FIXME

flushCache is used when:

Checkpointing

void checkpoint()

checkpoint finds all the persistent state stores (in the stores registry) that are logged (with a changelogPartition) and are not corrupted. For every state store, checkpoint records the changelogPartition and the offset (in a local checkpointingOffsets collection).

checkpoint prints out the following DEBUG message to the logs:

Writing checkpoint: [checkpointingOffsets]

checkpoint requests the OffsetCheckpoint file to write out the offsets.


In case of any IO exceptions, checkpoint prints out the following WARN message to the logs:

Failed to write offset checkpoint file to [checkpointFile].
This may occur if OS cleaned the state.dir in case when it located in ${java.io.tmpdir} directory.
This may also occur due to running multiple instances on the same machine using the same state dir.
Changing the location of state.dir may resolve the problem.

checkpoint is part of the StateManager abstraction.

registerStore

void registerStore(
  StateStore store,
  StateRestoreCallback stateRestoreCallback)

registerStore...FIXME

registerStore is part of the StateManager abstraction.

registerStateStores

void registerStateStores(
  List<StateStore> allStores, 
  InternalProcessorContext processorContext)

registerStateStores...FIXME

registerStateStores is used when:

maybeRegisterStoreWithChangelogReader

void maybeRegisterStoreWithChangelogReader(
  String storeName)

maybeRegisterStoreWithChangelogReader...FIXME

maybeRegisterStoreWithChangelogReader is used when:

getStorePartition

TopicPartition getStorePartition(
  String storeName)

getStorePartition creates a TopicPartition with the following:

  • changelogFor with the given storeName for the name of the (changelog) topic
  • The partition of the TaskId for the partition (of the changelog topic)

getStorePartition is used when:

initializeStoreOffsetsFromCheckpoint

void initializeStoreOffsetsFromCheckpoint(
  boolean storeDirIsEmpty)

initializeStoreOffsetsFromCheckpoint...FIXME

initializeStoreOffsetsFromCheckpoint is used when:

Logging

Enable ALL logging level for org.apache.kafka.streams.processor.internals.ProcessorStateManager logger to see what happens inside.

Add the following line to log4j.properties:

log4j.logger.org.apache.kafka.streams.processor.internals.ProcessorStateManager=ALL

Refer to Logging.

Back to top