CommandStore¶
CommandStore is a CommandQueue that uses the CommandTopic for fetching commands.
CommandStore is used to create the following services while building a KsqlRestApplication:

Creating Instance¶
CommandStore takes the following to be created:
- Name of the Command Topic
- CommandTopic
- SequenceNumberFutureStore
- Kafka Consumer Properties
- Kafka Producer properties
- ksql.server.command.response.timeout.ms
-
Serializer<CommandId> -
Serializer<Command> -
Deserializer<CommandId> -
CommandTopicBackup
CommandStore is created using Factory.create utility.
SequenceNumberFutureStore¶
CommandStore is given a SequenceNumberFutureStore when created.
The SequenceNumberFutureStore is used when:
- ensureConsumedPast (to pause until earlier commands are completed)
- completeSatisfiedSequenceNumberFutures (to mark commands completed)
Kafka Consumer Properties¶
Map<String, Object> kafkaConsumerProperties
CommandStore is given kafkaConsumerProperties when created.
The properties are used to create a KafkaConsumer (Apache Kafka) for the end offset in the command topic.
Kafka Producer properties¶
Map<String, Object> kafkaProducerProperties
CommandStore is given kafkaProducerProperties when created.
The properties are used to create a KafkaProducer (Apache Kafka) at createTransactionalProducer.
CommandTopic¶
CommandStore is given a CommandTopic when created.
The CommandTopic is started in start and runs until close.
Used when:
- getNewCommands
- getCommandTopicName
- getRestoreCommands
- isEmpty
- completeSatisfiedSequenceNumberFutures
- wakeup
Command Topic Name¶
CommandStore is given the name of the command topic when created.
The topic name is used when:
- enqueueCommand (to send commands to)
- getNewCommands (to fetch commands from)
- getCommandTopicOffset (to fetch command offsets from)
Partition 0¶
CommandStore uses just a single partition (0) for enqueueCommand onto and getCommandTopicOffset from the commandTopicName.
Creating CommandStore¶
CommandStore create(
KsqlConfig ksqlConfig,
String commandTopicName,
Duration commandQueueCatchupTimeout,
Map<String, Object> kafkaConsumerProperties,
Map<String, Object> kafkaProducerProperties,
KafkaTopicClient internalTopicClient)
create adds the following configuration properties to the given kafkaConsumerProperties (possibly overriding the current values if set).
| Configuration Property | Value |
|---|---|
isolation.level | READ_COMMITTED |
auto.offset.reset | none |
create adds the following configuration properties to the given kafkaProducerProperties (possibly overriding the current values if set).
| Configuration Property | Value |
|---|---|
transactional.id | ksql.service.id |
acks | all |
create creates a CommandTopicBackup (based on ksql.metastore.backup.location).
In the end, create creates a CommandStore with the following:
- The given
commandTopicName - A new CommandTopic
- A new
SequenceNumberFutureStore - others
create is used when:
KsqlRestApplicationutility is used to build a KsqlRestApplication instance
Starting Up¶
void start()
start requests the CommandTopic to start.
start is used when:
KsqlRestApplicationis requested to initialize
enqueueCommand¶
QueuedCommandStatus enqueueCommand(
CommandId commandId,
Command command,
Producer<CommandId, Command> transactionalProducer)
enqueueCommand is part of the CommandQueue abstraction.
enqueueCommand creates a ProducerRecord (Apache Kafka) as follows:
- Topic: commandTopicName
- Partition:
0 - Key: the given
commandId - Value: the given Command
enqueueCommand requests the given transactionalProducer to send the record.
enqueueCommand returns a QueuedCommandStatus with the record offset (and a CommandStatusFuture).
Fetching New Commands¶
List<QueuedCommand> getNewCommands(
Duration timeout)
getNewCommands is part of the CommandQueue abstraction.
getNewCommands completeSatisfiedSequenceNumberFutures.
getNewCommands requests the CommandTopic for new commands (ConsumerRecord<byte[], byte[]>s).
getNewCommands creates a QueuedCommand for every new command with a non-null value.
getNewCommands returns the QueuedCommands.
completeSatisfiedSequenceNumberFutures¶
void completeSatisfiedSequenceNumberFutures()
completeSatisfiedSequenceNumberFutures requests the CommandTopic for the current consumer position that is then used to request the SequenceNumberFutureStore to complete futures up to this position.
Creating Transactional Kafka Producer¶
Producer<CommandId, Command> createTransactionalProducer()
createTransactionalProducer is part of the CommandQueue abstraction.
createTransactionalProducer creates a KafkaProducer (Apache Kafka) with the following:
- kafkaProducerProperties
- commandIdSerializer as the key serializer
- commandSerializer as the value serializer
ensureConsumedPast¶
void ensureConsumedPast(
long seqNum,
Duration timeout)
ensureConsumedPast is part of the CommandQueue abstraction.
ensureConsumedPast requests the SequenceNumberFutureStore for a CompletableFuture for the given sequence number.
In the end, ensureConsumedPast waits for this future to complete (if necessary) for at most given timeout seconds.
Completing CompletableFuture
CompletableFutures are completed in completeSatisfiedSequenceNumberFutures.
waitForCommandConsumer¶
void waitForCommandConsumer()
waitForCommandConsumer is part of the CommandQueue abstraction.
waitForCommandConsumer ensureConsumedPast (with the end offset in the command topic and commandQueueCatchupTimeout).
End Offset in Command Topic¶
long getCommandTopicOffset()
getCommandTopicOffset creates a TopicPartition for the command topic (with the name and partition 0).
getCommandTopicOffset creates a KafkaConsumer (Apache Kafka) with the consumer properties (and ByteArrayDeserializers for the keys and values).
getCommandTopicOffset requests the KafkaConsumer to KafkaConsumer.assign itself to consume records from the TopicPartition only and then KafkaConsumer.endOffsets.
Logging¶
Enable ALL logging level for io.confluent.ksql.rest.server.computation.CommandStore logger to see what happens inside.
Add the following line to log4j.properties:
log4j.logger.io.confluent.ksql.rest.server.computation.CommandStore=ALL
Refer to Logging.