TransactionManager¶
Creating Instance¶
TransactionManager takes the following to be created:
-
LogContext - transactional.id
- transaction.timeout.ms
- retry.backoff.ms
-
ApiVersions
TransactionManager is created along with KafkaProducer (with idempotenceEnabled).
States¶
TransactionManager can be in one of the following states:
- UNINITIALIZED
- INITIALIZING
- READY
- IN_TRANSACTION
- COMMITTING_TRANSACTION
- ABORTING_TRANSACTION
- ABORTABLE_ERROR
- FATAL_ERROR
Valid Transitions¶
| Source (Current) State | Target States | transitionTo |
|---|---|---|
| ABORTABLE_ERROR |
| |
| ABORTING_TRANSACTION |
| beginAbort |
| COMMITTING_TRANSACTION |
| beginCommit |
| IN_TRANSACTION |
| beginTransaction |
| INITIALIZING | READY | |
| READY |
|
|
| UNINITIALIZED | INITIALIZING | resetIdempotentProducerId |
| any state | FATAL_ERROR |
beginAbort¶
TransactionalRequestResult beginAbort()
beginAbort...FIXME
beginAbort is used when:
KafkaProduceris requested to abortTransactionSenderis requested to run (and is shuting down)
beginCommit¶
TransactionalRequestResult beginCommit()
beginCommit...FIXME
beginCommit is used when:
KafkaProduceris requested to commitTransaction
beginCompletingTransaction¶
TransactionalRequestResult beginCompletingTransaction(
TransactionResult transactionResult)
beginCompletingTransaction...FIXME
beginCompletingTransaction is used when:
TransactionManageris requested to beginCommit and beginAbort
beginTransaction¶
void beginTransaction()
beginTransaction makes sure that the producer is transactional and transition to IN_TRANSACTION state.
beginTransaction is used when:
KafkaProduceris requested to beginTransaction
initializeTransactions¶
TransactionalRequestResult initializeTransactions() // (1)
TransactionalRequestResult initializeTransactions(
ProducerIdAndEpoch producerIdAndEpoch)
- Uses
ProducerIdAndEpoch.NONE
initializeTransactions...FIXME
initializeTransactions is used when:
KafkaProduceris requested to initTransactionsTransactionManageris requested to beginCompletingTransaction
isTransactional¶
boolean isTransactional()
isTransactional is enabled (true) when the transactional.id configuration property is defined (for the producer and the transactionalId was given when created).
maybeAddPartitionToTransaction¶
void maybeAddPartitionToTransaction(
TopicPartition topicPartition)
maybeAddPartitionToTransaction...FIXME
maybeAddPartitionToTransaction is used when:
KafkaProduceris requested to doSend
sendOffsetsToTransaction¶
TransactionalRequestResult sendOffsetsToTransaction(
Map<TopicPartition, OffsetAndMetadata> offsets,
ConsumerGroupMetadata groupMetadata)
sendOffsetsToTransaction...FIXME
sendOffsetsToTransaction is used when:
KafkaProduceris requested to sendOffsetsToTransaction