Skip to content


CommandStore is a CommandQueue that uses the CommandTopic for fetching commands.

CommandStore is used to create the following services while building a KsqlRestApplication:


Creating Instance

CommandStore takes the following to be created:

CommandStore is created using Factory.create utility.


CommandStore is given a SequenceNumberFutureStore when created.

The SequenceNumberFutureStore is used when:

Kafka Consumer Properties

Map<String, Object> kafkaConsumerProperties

CommandStore is given kafkaConsumerProperties when created.

The properties are used to create a KafkaConsumer (Apache Kafka) for the end offset in the command topic.

Kafka Producer properties

Map<String, Object> kafkaProducerProperties

CommandStore is given kafkaProducerProperties when created.

The properties are used to create a KafkaProducer (Apache Kafka) at createTransactionalProducer.


CommandStore is given a CommandTopic when created.

The CommandTopic is started in start and runs until close.

Used when:

Command Topic Name

CommandStore is given the name of the command topic when created.

The topic name is used when:

Partition 0

CommandStore uses just a single partition (0) for enqueueCommand onto and getCommandTopicOffset from the commandTopicName.

Creating CommandStore

CommandStore create(
  KsqlConfig ksqlConfig,
  String commandTopicName,
  Duration commandQueueCatchupTimeout,
  Map<String, Object> kafkaConsumerProperties,
  Map<String, Object> kafkaProducerProperties,
  KafkaTopicClient internalTopicClient)

create adds the following configuration properties to the given kafkaConsumerProperties (possibly overriding the current values if set).

Configuration Property Value
isolation.level READ_COMMITTED
auto.offset.reset none

create adds the following configuration properties to the given kafkaProducerProperties (possibly overriding the current values if set).

Configuration Property Value
acks all

create creates a CommandTopicBackup (based on ksql.metastore.backup.location).

In the end, create creates a CommandStore with the following:

  • The given commandTopicName
  • A new CommandTopic
  • A new SequenceNumberFutureStore
  • others

create is used when:

Starting Up

void start()

start requests the CommandTopic to start.

start is used when:


QueuedCommandStatus enqueueCommand(
  CommandId commandId,
  Command command,
  Producer<CommandId, Command> transactionalProducer)

enqueueCommand is part of the CommandQueue abstraction.

enqueueCommand creates a ProducerRecord (Apache Kafka) as follows:

enqueueCommand requests the given transactionalProducer to send the record.

enqueueCommand returns a QueuedCommandStatus with the record offset (and a CommandStatusFuture).

Fetching New Commands

List<QueuedCommand> getNewCommands(
  Duration timeout)

getNewCommands is part of the CommandQueue abstraction.

getNewCommands completeSatisfiedSequenceNumberFutures.

getNewCommands requests the CommandTopic for new commands (ConsumerRecord<byte[], byte[]>s).

getNewCommands creates a QueuedCommand for every new command with a non-null value.

getNewCommands returns the QueuedCommands.


void completeSatisfiedSequenceNumberFutures()

completeSatisfiedSequenceNumberFutures requests the CommandTopic for the current consumer position that is then used to request the SequenceNumberFutureStore to complete futures up to this position.

Creating Transactional Kafka Producer

Producer<CommandId, Command> createTransactionalProducer()

createTransactionalProducer is part of the CommandQueue abstraction.

createTransactionalProducer creates a KafkaProducer (Apache Kafka) with the following:


void ensureConsumedPast(
  long seqNum,
  Duration timeout)

ensureConsumedPast is part of the CommandQueue abstraction.

ensureConsumedPast requests the SequenceNumberFutureStore for a CompletableFuture for the given sequence number.

In the end, ensureConsumedPast waits for this future to complete (if necessary) for at most given timeout seconds.

Completing CompletableFuture

CompletableFutures are completed in completeSatisfiedSequenceNumberFutures.


void waitForCommandConsumer()

waitForCommandConsumer is part of the CommandQueue abstraction.

waitForCommandConsumer ensureConsumedPast (with the end offset in the command topic and commandQueueCatchupTimeout).

End Offset in Command Topic

long getCommandTopicOffset()

getCommandTopicOffset creates a TopicPartition for the command topic (with the name and partition 0).

getCommandTopicOffset creates a KafkaConsumer (Apache Kafka) with the consumer properties (and ByteArrayDeserializers for the keys and values).

getCommandTopicOffset requests the KafkaConsumer to KafkaConsumer.assign itself to consume records from the TopicPartition only and then KafkaConsumer.endOffsets.


Enable ALL logging level for logger to see what happens inside.

Add the following line to

Refer to Logging.