TransactionCoordinator¶
TransactionCoordinator runs on every Kafka broker (BrokerServer or KafkaServer).
Creating Instance¶
TransactionCoordinator takes the following to be created:
- Broker Id
- TransactionConfig
-
Scheduler -
createProducerIdGeneratorfunction (() => ProducerIdGenerator) - TransactionStateManager
-
TransactionMarkerChannelManager -
Time -
LogContext
TransactionCoordinator is created using apply factory.
Creating TransactionCoordinator¶
apply(
config: KafkaConfig,
replicaManager: ReplicaManager,
scheduler: Scheduler,
createProducerIdGenerator: () => ProducerIdGenerator,
metrics: Metrics,
metadataCache: MetadataCache,
time: Time): TransactionCoordinator
apply creates a TransactionConfig.
apply creates a TransactionStateManager (with the brokerId and the other Kafka services).
apply creates a LogContext that uses the following log prefix (with the brokerId):
[TransactionCoordinator id=[brokerId]]
apply creates a TransactionMarkerChannelManager.
In the end, apply creates a TransactionCoordinator.
apply is used when:
Starting Up¶
startup(
retrieveTransactionTopicPartitionCount: () => Int,
enableTransactionalIdExpiration: Boolean = true): Unit
startup...FIXME
startup is used when:
onElection¶
onElection(
txnTopicPartitionId: Int,
coordinatorEpoch: Int): Unit
onElection prints out the following INFO message to the logs:
Elected as the txn coordinator for partition [txnTopicPartitionId] at epoch [coordinatorEpoch]
onElection requests the TransactionMarkerChannelManager to removeMarkersForTxnTopicPartition for the given txnTopicPartitionId partition.
In the end, onElection requests the TransactionStateManager to loadTransactionsForTxnTopicPartition.
onElection is used when:
RequestHandlerHelperis requested to onLeadershipChange
onResignation¶
onResignation(
txnTopicPartitionId: Int,
coordinatorEpoch: Option[Int]): Unit
onResignation...FIXME
onResignation is used when:
KafkaApisis requested to handleStopReplicaRequestRequestHandlerHelperis requested to onLeadershipChange
handleInitProducerId¶
handleInitProducerId(
transactionalId: String,
transactionTimeoutMs: Int,
expectedProducerIdAndEpoch: Option[ProducerIdAndEpoch],
responseCallback: InitProducerIdCallback): Unit
For transactionalId undefined (null), handleInitProducerId requests the ProducerIdGenerator to generateProducerId and sends it back (using the given InitProducerIdCallback).
handleInitProducerId requests the TransactionStateManager to getTransactionState for the given transactionalId.
handleInitProducerId prints out the following INFO message to the logs:
Initialized transactionalId [transactionalId] with producerId [producerId] and producer epoch [producerEpoch]
on partition __transaction_state-[partition]
In the end, handleInitProducerId requests the TransactionStateManager to appendTransactionToLog.
handleInitProducerId is used when:
KafkaApisis requested to handleInitProducerIdRequest
transactionTopicConfigs¶
transactionTopicConfigs: Properties
transactionTopicConfigs requests the TransactionStateManager for the transactionTopicConfigs
transactionTopicConfigs is used when:
DefaultAutoTopicCreationManageris requested to creatableTopic (for__transaction_statetopic)
onEndTransactionComplete¶
onEndTransactionComplete(
txnIdAndPidEpoch: TransactionalIdAndProducerIdEpoch)(
error: Errors): Unit
onEndTransactionComplete branches off per the error to print out a message to the logs.
onEndTransactionComplete is used when:
TransactionCoordinatoris requested to start up (and schedules thetransaction-aborttask)
No Errors¶
For no errors, onEndTransactionComplete prints out the following INFO message to the logs:
Completed rollback of ongoing transaction for transactionalId [transactionalId] due to timeout
Rollback Cancelled¶
For INVALID_PRODUCER_ID_MAPPING, PRODUCER_FENCED, CONCURRENT_TRANSACTIONS, onEndTransactionComplete prints out the following DEBUG message to the logs:
Rollback of ongoing transaction for transactionalId [transactionalId] has been cancelled due to error [error]
Rollback Failed¶
For all other errors, onEndTransactionComplete prints out the following WARN message to the logs:
Rollback of ongoing transaction for transactionalId [transactionalId] failed due to error [error]
Logging¶
Enable ALL logging level for kafka.coordinator.transaction.TransactionCoordinator logger to see what happens inside.
Add the following line to log4j.properties:
log4j.logger.kafka.coordinator.transaction.TransactionCoordinator=ALL
Refer to Logging.