Skip to content

CommandStore

CommandStore is a CommandQueue.

Creating Instance

CommandStore takes the following to be created:

  • Name of the Command Topic
  • CommandTopic
  • SequenceNumberFutureStore
  • Kafka Consumer Properties
  • Kafka Producer Properties
  • Command Queue Catchup Timeout
  • Serializer<CommandId>
  • Serializer<Command>
  • Deserializer<CommandId>
  • CommandTopicBackup

CommandStore is created using create utility.

create

CommandStore create(
  KsqlConfig ksqlConfig,
  String commandTopicName,
  Duration commandQueueCatchupTimeout,
  Map<String, Object> kafkaConsumerProperties,
  Map<String, Object> kafkaProducerProperties,
  ServiceContext serviceContext)

create...FIXME

create is used when:

enqueueCommand

QueuedCommandStatus enqueueCommand(
  CommandId commandId,
  Command command,
  Producer<CommandId, Command> transactionalProducer)

enqueueCommand creates a ProducerRecord (Apache Kafka) as follows:

enqueueCommand requests the given transactionalProducer to send the record.

enqueueCommand returns a QueuedCommandStatus with the record offset (and a CommandStatusFuture).

enqueueCommand is part of the CommandQueue abstraction.

Back to top