Skip to content


CommandStore is a CommandQueue.

Creating Instance

CommandStore takes the following to be created:

  • Name of the Command Topic
  • CommandTopic
  • SequenceNumberFutureStore
  • Kafka Consumer Properties
  • Kafka Producer Properties
  • Command Queue Catchup Timeout
  • Serializer<CommandId>
  • Serializer<Command>
  • Deserializer<CommandId>
  • CommandTopicBackup

CommandStore is created using create utility.


CommandStore create(
  KsqlConfig ksqlConfig,
  String commandTopicName,
  Duration commandQueueCatchupTimeout,
  Map<String, Object> kafkaConsumerProperties,
  Map<String, Object> kafkaProducerProperties,
  ServiceContext serviceContext)


create is used when:


QueuedCommandStatus enqueueCommand(
  CommandId commandId,
  Command command,
  Producer<CommandId, Command> transactionalProducer)

enqueueCommand creates a ProducerRecord (Apache Kafka) as follows:

enqueueCommand requests the given transactionalProducer to send the record.

enqueueCommand returns a QueuedCommandStatus with the record offset (and a CommandStatusFuture).

enqueueCommand is part of the CommandQueue abstraction.

Back to top