TransactionManager¶
Creating Instance¶
TransactionManager
takes the following to be created:
-
LogContext
- transactional.id
- transaction.timeout.ms
- retry.backoff.ms
-
ApiVersions
TransactionManager
is created along with KafkaProducer (with idempotenceEnabled).
States¶
TransactionManager
can be in one of the following states:
- UNINITIALIZED
- INITIALIZING
- READY
- IN_TRANSACTION
- COMMITTING_TRANSACTION
- ABORTING_TRANSACTION
- ABORTABLE_ERROR
- FATAL_ERROR
Valid Transitions¶
Source (Current) State | Target States | transitionTo |
---|---|---|
ABORTABLE_ERROR |
| |
ABORTING_TRANSACTION |
| beginAbort |
COMMITTING_TRANSACTION |
| beginCommit |
IN_TRANSACTION |
| beginTransaction |
INITIALIZING | READY | |
READY |
|
|
UNINITIALIZED | INITIALIZING | resetIdempotentProducerId |
any state | FATAL_ERROR |
beginAbort¶
TransactionalRequestResult beginAbort()
beginAbort
...FIXME
beginAbort
is used when:
KafkaProducer
is requested to abortTransactionSender
is requested to run (and is shuting down)
beginCommit¶
TransactionalRequestResult beginCommit()
beginCommit
...FIXME
beginCommit
is used when:
KafkaProducer
is requested to commitTransaction
beginCompletingTransaction¶
TransactionalRequestResult beginCompletingTransaction(
TransactionResult transactionResult)
beginCompletingTransaction
...FIXME
beginCompletingTransaction
is used when:
TransactionManager
is requested to beginCommit and beginAbort
beginTransaction¶
void beginTransaction()
beginTransaction
makes sure that the producer is transactional and transition to IN_TRANSACTION
state.
beginTransaction
is used when:
KafkaProducer
is requested to beginTransaction
initializeTransactions¶
TransactionalRequestResult initializeTransactions() // (1)
TransactionalRequestResult initializeTransactions(
ProducerIdAndEpoch producerIdAndEpoch)
- Uses
ProducerIdAndEpoch.NONE
initializeTransactions
...FIXME
initializeTransactions
is used when:
KafkaProducer
is requested to initTransactionsTransactionManager
is requested to beginCompletingTransaction
isTransactional¶
boolean isTransactional()
isTransactional
is enabled (true
) when the transactional.id configuration property is defined (for the producer and the transactionalId was given when created).
maybeAddPartitionToTransaction¶
void maybeAddPartitionToTransaction(
TopicPartition topicPartition)
maybeAddPartitionToTransaction
...FIXME
maybeAddPartitionToTransaction
is used when:
KafkaProducer
is requested to doSend
sendOffsetsToTransaction¶
TransactionalRequestResult sendOffsetsToTransaction(
Map<TopicPartition, OffsetAndMetadata> offsets,
ConsumerGroupMetadata groupMetadata)
sendOffsetsToTransaction
...FIXME
sendOffsetsToTransaction
is used when:
KafkaProducer
is requested to sendOffsetsToTransaction