Skip to content


TransactionCoordinator runs on every Kafka broker (BrokerServer or KafkaServer).

Creating Instance

TransactionCoordinator takes the following to be created:

TransactionCoordinator is created using apply factory.

Creating TransactionCoordinator

  config: KafkaConfig,
  replicaManager: ReplicaManager,
  scheduler: Scheduler,
  createProducerIdGenerator: () => ProducerIdGenerator,
  metrics: Metrics,
  metadataCache: MetadataCache,
  time: Time): TransactionCoordinator

apply creates a TransactionConfig.

apply creates a TransactionStateManager (with the brokerId and the other Kafka services).

apply creates a LogContext that uses the following log prefix (with the brokerId):

[TransactionCoordinator id=[brokerId]]

apply creates a TransactionMarkerChannelManager.

In the end, apply creates a TransactionCoordinator.

apply is used when:

Starting Up

  retrieveTransactionTopicPartitionCount: () => Int,
  enableTransactionalIdExpiration: Boolean = true): Unit


startup is used when:


  txnTopicPartitionId: Int,
  coordinatorEpoch: Int): Unit

onElection prints out the following INFO message to the logs:

Elected as the txn coordinator for partition [txnTopicPartitionId] at epoch [coordinatorEpoch]

onElection requests the TransactionMarkerChannelManager to removeMarkersForTxnTopicPartition for the given txnTopicPartitionId partition.

In the end, onElection requests the TransactionStateManager to loadTransactionsForTxnTopicPartition.

onElection is used when:


  txnTopicPartitionId: Int,
  coordinatorEpoch: Option[Int]): Unit


onResignation is used when:


  transactionalId: String,
  transactionTimeoutMs: Int,
  expectedProducerIdAndEpoch: Option[ProducerIdAndEpoch],
  responseCallback: InitProducerIdCallback): Unit

For transactionalId undefined (null), handleInitProducerId requests the ProducerIdGenerator to generateProducerId and sends it back (using the given InitProducerIdCallback).

handleInitProducerId requests the TransactionStateManager to getTransactionState for the given transactionalId.

handleInitProducerId prints out the following INFO message to the logs:

Initialized transactionalId [transactionalId] with producerId [producerId] and producer epoch [producerEpoch]
on partition __transaction_state-[partition]

In the end, handleInitProducerId requests the TransactionStateManager to appendTransactionToLog.

handleInitProducerId is used when:


transactionTopicConfigs: Properties

transactionTopicConfigs requests the TransactionStateManager for the transactionTopicConfigs

transactionTopicConfigs is used when:

  • DefaultAutoTopicCreationManager is requested to creatableTopic (for __transaction_state topic)


  txnIdAndPidEpoch: TransactionalIdAndProducerIdEpoch)(
  error: Errors): Unit

onEndTransactionComplete branches off per the error to print out a message to the logs.

onEndTransactionComplete is used when:

  • TransactionCoordinator is requested to start up (and schedules the transaction-abort task)

No Errors

For no errors, onEndTransactionComplete prints out the following INFO message to the logs:

Completed rollback of ongoing transaction for transactionalId [transactionalId] due to timeout

Rollback Cancelled

For INVALID_PRODUCER_ID_MAPPING, PRODUCER_FENCED, CONCURRENT_TRANSACTIONS, onEndTransactionComplete prints out the following DEBUG message to the logs:

Rollback of ongoing transaction for transactionalId [transactionalId] has been cancelled due to error [error]

Rollback Failed

For all other errors, onEndTransactionComplete prints out the following WARN message to the logs:

Rollback of ongoing transaction for transactionalId [transactionalId] failed due to error [error]


Enable ALL logging level for kafka.coordinator.transaction.TransactionCoordinator logger to see what happens inside.

Add the following line to


Refer to Logging.