Skip to content

TransactionCoordinator

TransactionCoordinator runs on every Kafka broker (BrokerServer or KafkaServer).

Creating Instance

TransactionCoordinator takes the following to be created:

TransactionCoordinator is created using apply factory.

Creating TransactionCoordinator

apply(
  config: KafkaConfig,
  replicaManager: ReplicaManager,
  scheduler: Scheduler,
  createProducerIdGenerator: () => ProducerIdGenerator,
  metrics: Metrics,
  metadataCache: MetadataCache,
  time: Time): TransactionCoordinator

apply creates a TransactionConfig.

apply creates a TransactionStateManager (with the brokerId and the other Kafka services).

apply creates a LogContext that uses the following log prefix (with the brokerId):

[TransactionCoordinator id=[brokerId]]

apply creates a TransactionMarkerChannelManager.

In the end, apply creates a TransactionCoordinator.

apply is used when:

Starting Up

startup(
  retrieveTransactionTopicPartitionCount: () => Int,
  enableTransactionalIdExpiration: Boolean = true): Unit

startup...FIXME

startup is used when:

onElection

onElection(
  txnTopicPartitionId: Int,
  coordinatorEpoch: Int): Unit

onElection prints out the following INFO message to the logs:

Elected as the txn coordinator for partition [txnTopicPartitionId] at epoch [coordinatorEpoch]

onElection requests the TransactionMarkerChannelManager to removeMarkersForTxnTopicPartition for the given txnTopicPartitionId partition.

In the end, onElection requests the TransactionStateManager to loadTransactionsForTxnTopicPartition.

onElection is used when:

onResignation

onResignation(
  txnTopicPartitionId: Int,
  coordinatorEpoch: Option[Int]): Unit

onResignation...FIXME

onResignation is used when:

handleInitProducerId

handleInitProducerId(
  transactionalId: String,
  transactionTimeoutMs: Int,
  expectedProducerIdAndEpoch: Option[ProducerIdAndEpoch],
  responseCallback: InitProducerIdCallback): Unit

For transactionalId undefined (null), handleInitProducerId requests the ProducerIdGenerator to generateProducerId and sends it back (using the given InitProducerIdCallback).

handleInitProducerId requests the TransactionStateManager to getTransactionState for the given transactionalId.

handleInitProducerId prints out the following INFO message to the logs:

Initialized transactionalId [transactionalId] with producerId [producerId] and producer epoch [producerEpoch]
on partition __transaction_state-[partition]

In the end, handleInitProducerId requests the TransactionStateManager to appendTransactionToLog.

handleInitProducerId is used when:

transactionTopicConfigs

transactionTopicConfigs: Properties

transactionTopicConfigs requests the TransactionStateManager for the transactionTopicConfigs


transactionTopicConfigs is used when:

  • DefaultAutoTopicCreationManager is requested to creatableTopic (for __transaction_state topic)

onEndTransactionComplete

onEndTransactionComplete(
  txnIdAndPidEpoch: TransactionalIdAndProducerIdEpoch)(
  error: Errors): Unit

onEndTransactionComplete branches off per the error to print out a message to the logs.


onEndTransactionComplete is used when:

  • TransactionCoordinator is requested to start up (and schedules the transaction-abort task)

No Errors

For no errors, onEndTransactionComplete prints out the following INFO message to the logs:

Completed rollback of ongoing transaction for transactionalId [transactionalId] due to timeout

Rollback Cancelled

For INVALID_PRODUCER_ID_MAPPING, PRODUCER_FENCED, CONCURRENT_TRANSACTIONS, onEndTransactionComplete prints out the following DEBUG message to the logs:

Rollback of ongoing transaction for transactionalId [transactionalId] has been cancelled due to error [error]

Rollback Failed

For all other errors, onEndTransactionComplete prints out the following WARN message to the logs:

Rollback of ongoing transaction for transactionalId [transactionalId] failed due to error [error]

Logging

Enable ALL logging level for kafka.coordinator.transaction.TransactionCoordinator logger to see what happens inside.

Add the following line to log4j.properties:

log4j.logger.kafka.coordinator.transaction.TransactionCoordinator=ALL

Refer to Logging.