Skip to content

TransactionCoordinator

TransactionCoordinator runs on every Kafka broker (BrokerServer or KafkaServer).

Creating Instance

TransactionCoordinator takes the following to be created:

TransactionCoordinator is created using apply factory.

Creating TransactionCoordinator

apply(
  config: KafkaConfig,
  replicaManager: ReplicaManager,
  scheduler: Scheduler,
  createProducerIdGenerator: () => ProducerIdGenerator,
  metrics: Metrics,
  metadataCache: MetadataCache,
  time: Time): TransactionCoordinator

apply creates a TransactionConfig.

apply creates a TransactionStateManager (with the brokerId and the other Kafka services).

apply creates a LogContext that uses the following log prefix (with the brokerId):

[TransactionCoordinator id=[brokerId]]

apply creates a TransactionMarkerChannelManager.

In the end, apply creates a TransactionCoordinator.

apply is used when:

Starting Up

startup(
  retrieveTransactionTopicPartitionCount: () => Int,
  enableTransactionalIdExpiration: Boolean = true): Unit

startup...FIXME

startup is used when:

onElection

onElection(
  txnTopicPartitionId: Int,
  coordinatorEpoch: Int): Unit

onElection prints out the following INFO message to the logs:

Elected as the txn coordinator for partition [txnTopicPartitionId] at epoch [coordinatorEpoch]

onElection requests the TransactionMarkerChannelManager to removeMarkersForTxnTopicPartition for the given txnTopicPartitionId partition.

In the end, onElection requests the TransactionStateManager to loadTransactionsForTxnTopicPartition.

onElection is used when:

onResignation

onResignation(
  txnTopicPartitionId: Int,
  coordinatorEpoch: Option[Int]): Unit

onResignation...FIXME

onResignation is used when:

handleInitProducerId

handleInitProducerId(
  transactionalId: String,
  transactionTimeoutMs: Int,
  expectedProducerIdAndEpoch: Option[ProducerIdAndEpoch],
  responseCallback: InitProducerIdCallback): Unit

For transactionalId undefined (null), handleInitProducerId requests the ProducerIdGenerator to generateProducerId and sends it back (using the given InitProducerIdCallback).

handleInitProducerId requests the TransactionStateManager to getTransactionState for the given transactionalId.

handleInitProducerId prints out the following INFO message to the logs:

Initialized transactionalId [transactionalId] with producerId [producerId] and producer epoch [producerEpoch]
on partition __transaction_state-[partition]

In the end, handleInitProducerId requests the TransactionStateManager to appendTransactionToLog.

handleInitProducerId is used when:

Logging

Enable ALL logging level for kafka.coordinator.transaction.TransactionCoordinator logger to see what happens inside.

Add the following line to log4j.properties:

log4j.logger.kafka.coordinator.transaction.TransactionCoordinator=ALL

Refer to Logging.

Back to top