CommandTopic¶
CommandTopic manages a KafkaConsumer to fetch new commands (for CommandStore) from the internal command topic.

Note
KsqlRestApplication uses commandTopic utility to build the name of the command topic.
Creating Instance¶
CommandTopic takes the following to be created:
- Name of the internal command topic
-
KafkaConsumerProperties -
CommandTopicBackup
CommandTopic is created when:
CommandStore.Factoryis requested to create a CommandStore
KafkaConsumer¶
CommandTopic can be given a Consumer<byte[], byte[]> or properties to create one when created.
The Consumer is assigned the commandTopicPartition in start.
The Consumer fetches ConsumerRecord<byte[], byte[]>s when requested for new commands.
commandTopicPartition¶
CommandTopic creates a Kafka's TopicPartition for the commandTopicName and 0 partition when created.
The commandTopicPartition is assigned to commandConsumer in start.
Starting Up¶
void start()
start requests the CommandTopicBackup to initialize.
start requests the KafkaConsumer to consume records from the commandTopicPartition only (using KafkaConsumer.assign).
start is used when:
CommandStoreis requested to start
Fetching New Commands¶
Iterable<ConsumerRecord<byte[], byte[]>> getNewCommands(
Duration timeout)
getNewCommands requests the commandConsumer to fetch records (ConsumerRecord<byte[], byte[]>s).
getNewCommands makes a backup of every record.
getNewCommands is used when:
CommandStoreis requested for new commands