TransactionCoordinator¶
TransactionCoordinator
runs on every Kafka broker (BrokerServer or KafkaServer).
Creating Instance¶
TransactionCoordinator
takes the following to be created:
- Broker Id
- TransactionConfig
-
Scheduler
-
createProducerIdGenerator
function (() => ProducerIdGenerator
) - TransactionStateManager
-
TransactionMarkerChannelManager
-
Time
-
LogContext
TransactionCoordinator
is created using apply factory.
Creating TransactionCoordinator¶
apply(
config: KafkaConfig,
replicaManager: ReplicaManager,
scheduler: Scheduler,
createProducerIdGenerator: () => ProducerIdGenerator,
metrics: Metrics,
metadataCache: MetadataCache,
time: Time): TransactionCoordinator
apply
creates a TransactionConfig.
apply
creates a TransactionStateManager (with the brokerId and the other Kafka services).
apply
creates a LogContext
that uses the following log prefix (with the brokerId):
[TransactionCoordinator id=[brokerId]]
apply
creates a TransactionMarkerChannelManager
.
In the end, apply
creates a TransactionCoordinator.
apply
is used when:
Starting Up¶
startup(
retrieveTransactionTopicPartitionCount: () => Int,
enableTransactionalIdExpiration: Boolean = true): Unit
startup
...FIXME
startup
is used when:
onElection¶
onElection(
txnTopicPartitionId: Int,
coordinatorEpoch: Int): Unit
onElection
prints out the following INFO message to the logs:
Elected as the txn coordinator for partition [txnTopicPartitionId] at epoch [coordinatorEpoch]
onElection
requests the TransactionMarkerChannelManager to removeMarkersForTxnTopicPartition for the given txnTopicPartitionId
partition.
In the end, onElection
requests the TransactionStateManager to loadTransactionsForTxnTopicPartition.
onElection
is used when:
RequestHandlerHelper
is requested to onLeadershipChange
onResignation¶
onResignation(
txnTopicPartitionId: Int,
coordinatorEpoch: Option[Int]): Unit
onResignation
...FIXME
onResignation
is used when:
KafkaApis
is requested to handleStopReplicaRequestRequestHandlerHelper
is requested to onLeadershipChange
handleInitProducerId¶
handleInitProducerId(
transactionalId: String,
transactionTimeoutMs: Int,
expectedProducerIdAndEpoch: Option[ProducerIdAndEpoch],
responseCallback: InitProducerIdCallback): Unit
For transactionalId
undefined (null
), handleInitProducerId
requests the ProducerIdGenerator to generateProducerId
and sends it back (using the given InitProducerIdCallback
).
handleInitProducerId
requests the TransactionStateManager to getTransactionState for the given transactionalId
.
handleInitProducerId
prints out the following INFO message to the logs:
Initialized transactionalId [transactionalId] with producerId [producerId] and producer epoch [producerEpoch]
on partition __transaction_state-[partition]
In the end, handleInitProducerId
requests the TransactionStateManager to appendTransactionToLog.
handleInitProducerId
is used when:
KafkaApis
is requested to handleInitProducerIdRequest
transactionTopicConfigs¶
transactionTopicConfigs: Properties
transactionTopicConfigs
requests the TransactionStateManager for the transactionTopicConfigs
transactionTopicConfigs
is used when:
DefaultAutoTopicCreationManager
is requested to creatableTopic (for__transaction_state
topic)
onEndTransactionComplete¶
onEndTransactionComplete(
txnIdAndPidEpoch: TransactionalIdAndProducerIdEpoch)(
error: Errors): Unit
onEndTransactionComplete
branches off per the error
to print out a message to the logs.
onEndTransactionComplete
is used when:
TransactionCoordinator
is requested to start up (and schedules thetransaction-abort
task)
No Errors¶
For no errors, onEndTransactionComplete
prints out the following INFO message to the logs:
Completed rollback of ongoing transaction for transactionalId [transactionalId] due to timeout
Rollback Cancelled¶
For INVALID_PRODUCER_ID_MAPPING
, PRODUCER_FENCED
, CONCURRENT_TRANSACTIONS
, onEndTransactionComplete
prints out the following DEBUG message to the logs:
Rollback of ongoing transaction for transactionalId [transactionalId] has been cancelled due to error [error]
Rollback Failed¶
For all other errors, onEndTransactionComplete
prints out the following WARN message to the logs:
Rollback of ongoing transaction for transactionalId [transactionalId] failed due to error [error]
Logging¶
Enable ALL
logging level for kafka.coordinator.transaction.TransactionCoordinator
logger to see what happens inside.
Add the following line to log4j.properties
:
log4j.logger.kafka.coordinator.transaction.TransactionCoordinator=ALL
Refer to Logging.