Skip to content

CommandTopic

CommandTopic manages a KafkaConsumer to fetch new commands (for CommandStore) from the internal command topic.

CommandTopic and CommandStore

Note

KsqlRestApplication uses commandTopic utility to build the name of the command topic.

Creating Instance

CommandTopic takes the following to be created:

  • Name of the internal command topic
  • KafkaConsumer Properties
  • CommandTopicBackup

CommandTopic is created when:

KafkaConsumer

CommandTopic can be given a Consumer<byte[], byte[]> or properties to create one when created.

The Consumer is assigned the commandTopicPartition in start.

The Consumer fetches ConsumerRecord<byte[], byte[]>s when requested for new commands.

commandTopicPartition

CommandTopic creates a Kafka's TopicPartition for the commandTopicName and 0 partition when created.

The commandTopicPartition is assigned to commandConsumer in start.

Starting Up

void start()

start requests the CommandTopicBackup to initialize.

start requests the KafkaConsumer to consume records from the commandTopicPartition only (using KafkaConsumer.assign).


start is used when:

  • CommandStore is requested to start

Fetching New Commands

Iterable<ConsumerRecord<byte[], byte[]>> getNewCommands(
  Duration timeout)

getNewCommands requests the commandConsumer to fetch records (ConsumerRecord<byte[], byte[]>s).

getNewCommands makes a backup of every record.


getNewCommands is used when: