CommandStore¶
CommandStore
is a CommandQueue that uses the CommandTopic for fetching commands.
CommandStore
is used to create the following services while building a KsqlRestApplication:
Creating Instance¶
CommandStore
takes the following to be created:
- Name of the Command Topic
- CommandTopic
- SequenceNumberFutureStore
- Kafka Consumer Properties
- Kafka Producer properties
- ksql.server.command.response.timeout.ms
-
Serializer<CommandId>
-
Serializer<Command>
-
Deserializer<CommandId>
-
CommandTopicBackup
CommandStore
is created using Factory.create utility.
SequenceNumberFutureStore¶
CommandStore
is given a SequenceNumberFutureStore when created.
The SequenceNumberFutureStore
is used when:
- ensureConsumedPast (to pause until earlier commands are completed)
- completeSatisfiedSequenceNumberFutures (to mark commands completed)
Kafka Consumer Properties¶
Map<String, Object> kafkaConsumerProperties
CommandStore
is given kafkaConsumerProperties
when created.
The properties are used to create a KafkaConsumer
(Apache Kafka) for the end offset in the command topic.
Kafka Producer properties¶
Map<String, Object> kafkaProducerProperties
CommandStore
is given kafkaProducerProperties
when created.
The properties are used to create a KafkaProducer
(Apache Kafka) at createTransactionalProducer.
CommandTopic¶
CommandStore
is given a CommandTopic when created.
The CommandTopic
is started in start and runs until close.
Used when:
- getNewCommands
- getCommandTopicName
- getRestoreCommands
- isEmpty
- completeSatisfiedSequenceNumberFutures
- wakeup
Command Topic Name¶
CommandStore
is given the name of the command topic when created.
The topic name is used when:
- enqueueCommand (to send commands to)
- getNewCommands (to fetch commands from)
- getCommandTopicOffset (to fetch command offsets from)
Partition 0¶
CommandStore
uses just a single partition (0
) for enqueueCommand onto and getCommandTopicOffset from the commandTopicName.
Creating CommandStore¶
CommandStore create(
KsqlConfig ksqlConfig,
String commandTopicName,
Duration commandQueueCatchupTimeout,
Map<String, Object> kafkaConsumerProperties,
Map<String, Object> kafkaProducerProperties,
KafkaTopicClient internalTopicClient)
create
adds the following configuration properties to the given kafkaConsumerProperties
(possibly overriding the current values if set).
Configuration Property | Value |
---|---|
isolation.level | READ_COMMITTED |
auto.offset.reset | none |
create
adds the following configuration properties to the given kafkaProducerProperties
(possibly overriding the current values if set).
Configuration Property | Value |
---|---|
transactional.id | ksql.service.id |
acks | all |
create
creates a CommandTopicBackup
(based on ksql.metastore.backup.location).
In the end, create
creates a CommandStore with the following:
- The given
commandTopicName
- A new CommandTopic
- A new
SequenceNumberFutureStore
- others
create
is used when:
KsqlRestApplication
utility is used to build a KsqlRestApplication instance
Starting Up¶
void start()
start
requests the CommandTopic to start.
start
is used when:
KsqlRestApplication
is requested to initialize
enqueueCommand¶
QueuedCommandStatus enqueueCommand(
CommandId commandId,
Command command,
Producer<CommandId, Command> transactionalProducer)
enqueueCommand
is part of the CommandQueue abstraction.
enqueueCommand
creates a ProducerRecord
(Apache Kafka) as follows:
- Topic: commandTopicName
- Partition:
0
- Key: the given
commandId
- Value: the given Command
enqueueCommand
requests the given transactionalProducer
to send the record.
enqueueCommand
returns a QueuedCommandStatus
with the record offset (and a CommandStatusFuture
).
Fetching New Commands¶
List<QueuedCommand> getNewCommands(
Duration timeout)
getNewCommands
is part of the CommandQueue abstraction.
getNewCommands
completeSatisfiedSequenceNumberFutures.
getNewCommands
requests the CommandTopic for new commands (ConsumerRecord<byte[], byte[]>
s).
getNewCommands
creates a QueuedCommand
for every new command with a non-null
value.
getNewCommands
returns the QueuedCommand
s.
completeSatisfiedSequenceNumberFutures¶
void completeSatisfiedSequenceNumberFutures()
completeSatisfiedSequenceNumberFutures
requests the CommandTopic for the current consumer position that is then used to request the SequenceNumberFutureStore to complete futures up to this position.
Creating Transactional Kafka Producer¶
Producer<CommandId, Command> createTransactionalProducer()
createTransactionalProducer
is part of the CommandQueue abstraction.
createTransactionalProducer
creates a KafkaProducer
(Apache Kafka) with the following:
- kafkaProducerProperties
- commandIdSerializer as the key serializer
- commandSerializer as the value serializer
ensureConsumedPast¶
void ensureConsumedPast(
long seqNum,
Duration timeout)
ensureConsumedPast
is part of the CommandQueue abstraction.
ensureConsumedPast
requests the SequenceNumberFutureStore for a CompletableFuture for the given sequence number.
In the end, ensureConsumedPast
waits for this future to complete (if necessary) for at most given timeout
seconds.
Completing CompletableFuture
CompletableFuture
s are completed in completeSatisfiedSequenceNumberFutures.
waitForCommandConsumer¶
void waitForCommandConsumer()
waitForCommandConsumer
is part of the CommandQueue abstraction.
waitForCommandConsumer
ensureConsumedPast (with the end offset in the command topic and commandQueueCatchupTimeout).
End Offset in Command Topic¶
long getCommandTopicOffset()
getCommandTopicOffset
creates a TopicPartition
for the command topic (with the name and partition 0).
getCommandTopicOffset
creates a KafkaConsumer
(Apache Kafka) with the consumer properties (and ByteArrayDeserializer
s for the keys and values).
getCommandTopicOffset
requests the KafkaConsumer
to KafkaConsumer.assign
itself to consume records from the TopicPartition
only and then KafkaConsumer.endOffsets
.
Logging¶
Enable ALL
logging level for io.confluent.ksql.rest.server.computation.CommandStore
logger to see what happens inside.
Add the following line to log4j.properties
:
log4j.logger.io.confluent.ksql.rest.server.computation.CommandStore=ALL
Refer to Logging.