Skip to content

TransactionManager

Creating Instance

TransactionManager takes the following to be created:

TransactionManager is created along with KafkaProducer (with idempotenceEnabled).

States

TransactionManager can be in one of the following states:

  • UNINITIALIZED
  • INITIALIZING
  • READY
  • IN_TRANSACTION
  • COMMITTING_TRANSACTION
  • ABORTING_TRANSACTION
  • ABORTABLE_ERROR
  • FATAL_ERROR

Valid Transitions

Source (Current) State Target States transitionTo
ABORTABLE_ERROR
  • ABORTING_TRANSACTION
  • ABORTABLE_ERROR
ABORTING_TRANSACTION
  • INITIALIZING
  • READY
beginAbort
COMMITTING_TRANSACTION
  • READY
  • ABORTABLE_ERROR
beginCommit
IN_TRANSACTION
  • COMMITTING_TRANSACTION
  • ABORTING_TRANSACTION
  • ABORTABLE_ERROR
beginTransaction
INITIALIZING READY
READY
  • UNINITIALIZED
  • IN_TRANSACTION
UNINITIALIZED INITIALIZING resetIdempotentProducerId
any state FATAL_ERROR

beginAbort

TransactionalRequestResult beginAbort()

beginAbort...FIXME

beginAbort is used when:

  • KafkaProducer is requested to abortTransaction
  • Sender is requested to run (and is shuting down)

beginCommit

TransactionalRequestResult beginCommit()

beginCommit...FIXME

beginCommit is used when:

beginCompletingTransaction

TransactionalRequestResult beginCompletingTransaction(
  TransactionResult transactionResult)

beginCompletingTransaction...FIXME

beginCompletingTransaction is used when:

beginTransaction

void beginTransaction()

beginTransaction makes sure that the producer is transactional and transition to IN_TRANSACTION state.

beginTransaction is used when:

initializeTransactions

TransactionalRequestResult initializeTransactions() // (1)
TransactionalRequestResult initializeTransactions(
  ProducerIdAndEpoch producerIdAndEpoch)
  1. Uses ProducerIdAndEpoch.NONE

initializeTransactions...FIXME

initializeTransactions is used when:

isTransactional

boolean isTransactional()

isTransactional is enabled (true) when the transactional.id configuration property is defined (for the producer and the transactionalId was given when created).

maybeAddPartitionToTransaction

void maybeAddPartitionToTransaction(
  TopicPartition topicPartition)

maybeAddPartitionToTransaction...FIXME

maybeAddPartitionToTransaction is used when:

  • KafkaProducer is requested to doSend

sendOffsetsToTransaction

TransactionalRequestResult sendOffsetsToTransaction(
  Map<TopicPartition, OffsetAndMetadata> offsets,
  ConsumerGroupMetadata groupMetadata)

sendOffsetsToTransaction...FIXME

sendOffsetsToTransaction is used when: