CommandStore¶
CommandStore
is a CommandQueue.
Creating Instance¶
CommandStore
takes the following to be created:
- Name of the Command Topic
-
CommandTopic
-
SequenceNumberFutureStore
- Kafka Consumer Properties
- Kafka Producer Properties
- Command Queue Catchup Timeout
-
Serializer<CommandId>
-
Serializer<Command>
-
Deserializer<CommandId>
-
CommandTopicBackup
CommandStore
is created using create utility.
create¶
CommandStore create(
KsqlConfig ksqlConfig,
String commandTopicName,
Duration commandQueueCatchupTimeout,
Map<String, Object> kafkaConsumerProperties,
Map<String, Object> kafkaProducerProperties,
ServiceContext serviceContext)
create
...FIXME
create
is used when:
KsqlRestApplication
utility is used to build a KsqlRestApplication instance
enqueueCommand¶
QueuedCommandStatus enqueueCommand(
CommandId commandId,
Command command,
Producer<CommandId, Command> transactionalProducer)
enqueueCommand
creates a ProducerRecord
(Apache Kafka) as follows:
- Topic: commandTopicName
- Partition:
0
- Key: the given
commandId
- Value: the given Command
enqueueCommand
requests the given transactionalProducer
to send the record.
enqueueCommand
returns a QueuedCommandStatus
with the record offset (and a CommandStatusFuture
).
enqueueCommand
is part of the CommandQueue abstraction.