TransactionStateManager¶
Creating Instance¶
TransactionStateManager
takes the following to be created:
- Broker ID
-
Scheduler
- ReplicaManager
- TransactionConfig
-
Time
-
Metrics
TransactionStateManager
is created when:
TransactionCoordinator
utility is used to create a TransactionCoordinator
Starting Up¶
startup(
retrieveTransactionTopicPartitionCount: () => Int,
enableTransactionalIdExpiration: Boolean = true): Unit
startup
...FIXME
startup
is used when:
TransactionCoordinator
is requested to start up
enableTransactionalIdExpiration¶
enableTransactionalIdExpiration(): Unit
enableTransactionalIdExpiration
...FIXME
appendTransactionToLog¶
appendTransactionToLog(
transactionalId: String,
coordinatorEpoch: Int,
newMetadata: TxnTransitMetadata,
responseCallback: Errors => Unit,
retryOnError: Errors => Boolean = _ => false): Unit
appendTransactionToLog
generates the key and the value (of the record to represent the transaction in the topic) based on the given transactionalId
and the TxnTransitMetadata
, respectively.
appendTransactionToLog
...FIXME
appendTransactionToLog
requests the ReplicaManager to appendRecords (with -1
acks, internalTopicsAllowed
enabled annd Coordinator
origin) and prints out the following TRACE message to the logs:
Appending new metadata [newMetadata] for transaction id [transactionalId] with coordinator epoch [coordinatorEpoch] to the local transaction log
appendTransactionToLog
is used when:
TransactionCoordinator
is requested to handleInitProducerId, handleAddPartitionsToTransaction, endTransactionTransactionMarkerChannelManager
is requested totryAppendToLog
partitionFor¶
partitionFor(
transactionalId: String): Int
partitionFor
calculates the partition for the given transactionalId
.
partitionFor
gets the absolute value of the hashCode
of the transactionalId
string modulo the number of partitions of the __transaction_state
topic.
partitionFor
is used when:
TransactionStateManager
is requested to appendTransactionToLog, enableTransactionalIdExpiration, getAndMaybeAddTransactionStateTransactionCoordinator
is requested to handleInitProducerIdTransactionMarkerChannelManager
is requested toaddTxnMarkersToBrokerQueue
loadTransactionsForTxnTopicPartition¶
loadTransactionsForTxnTopicPartition(
partitionId: Int,
coordinatorEpoch: Int,
sendTxnMarkers: SendTxnMarkersCallback): Unit
loadTransactionsForTxnTopicPartition
...FIXME
loadTransactionsForTxnTopicPartition
is used when:
TransactionCoordinator
is requested to onElection
removeTransactionsForTxnTopicPartition¶
removeTransactionsForTxnTopicPartition(
partitionId: Int): Unit
removeTransactionsForTxnTopicPartition(
partitionId: Int,
coordinatorEpoch: Int): Unit
removeTransactionsForTxnTopicPartition
...FIXME
removeTransactionsForTxnTopicPartition
is used when:
TransactionCoordinator
is requested to onResignation
transactionTopicConfigs¶
transactionTopicConfigs: Properties
Property Name | Property Value |
---|---|
cleanup.policy | compact |
compression.type | uncompressed |
min.insync.replicas | transaction.state.log.min.isr |
segment.bytes | transaction.state.log.segment.bytes |
unclean.leader.election.enable | false |
transactionTopicConfigs
is used when:
TransactionCoordinator
is requested to transactionTopicConfigs