Skip to content


CommandTopic manages a KafkaConsumer to fetch new commands (for CommandStore) from the internal command topic.

CommandTopic and CommandStore


KsqlRestApplication uses commandTopic utility to build the name of the command topic.

Creating Instance

CommandTopic takes the following to be created:

  • Name of the internal command topic
  • KafkaConsumer Properties
  • CommandTopicBackup

CommandTopic is created when:


CommandTopic can be given a Consumer<byte[], byte[]> or properties to create one when created.

The Consumer is assigned the commandTopicPartition in start.

The Consumer fetches ConsumerRecord<byte[], byte[]>s when requested for new commands.


CommandTopic creates a Kafka's TopicPartition for the commandTopicName and 0 partition when created.

The commandTopicPartition is assigned to commandConsumer in start.

Starting Up

void start()

start requests the CommandTopicBackup to initialize.

start requests the KafkaConsumer to consume records from the commandTopicPartition only (using KafkaConsumer.assign).

start is used when:

  • CommandStore is requested to start

Fetching New Commands

Iterable<ConsumerRecord<byte[], byte[]>> getNewCommands(
  Duration timeout)

getNewCommands requests the commandConsumer to fetch records (ConsumerRecord<byte[], byte[]>s).

getNewCommands makes a backup of every record.

getNewCommands is used when: