Demo: Transactional Kafka Producer¶
This demo shows the internals of transactional KafkaProducer that is a Kafka producer with transaction.id defined.
KafkaProducer¶
Start Up¶
Use sbt console
for interactive environment (or IntelliJ IDEA).
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.StringSerializer
import java.util.Properties
val props = new Properties()
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ":9092")
props.put(ProducerConfig.CLIENT_ID_CONFIG, "txn-demo")
// Define transaction.id
val transactionalId = "my-custom-txnId"
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId)
val producer = new KafkaProducer[String, String](props)
Initialize Transactions¶
producer.initTransactions
Once initialized, the transactional producer must not be initialized again.
scala> producer.initTransactions
org.apache.kafka.common.KafkaException: TransactionalId my-custom-txnId: Invalid transition attempted from state READY to state INITIALIZING
at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:1078)
at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:1071)
at org.apache.kafka.clients.producer.internals.TransactionManager.lambda$initializeTransactions$1(TransactionManager.java:337)
at org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1200)
at org.apache.kafka.clients.producer.internals.TransactionManager.initializeTransactions(TransactionManager.java:334)
at org.apache.kafka.clients.producer.internals.TransactionManager.initializeTransactions(TransactionManager.java:329)
at org.apache.kafka.clients.producer.KafkaProducer.initTransactions(KafkaProducer.java:596)
... 31 elided
Start Transaction¶
Next up is starting a transaction using KafkaProducer.beginTransaction
producer.beginTransaction
Transactional sends¶
import org.apache.kafka.clients.producer.ProducerRecord
val topic = "txn-demo"
val record = new ProducerRecord[String, String](topic, "Hello from transactional producer")
producer.send(record)
producer.send(
new ProducerRecord[String, String](topic, "Another hello from txn producer"))
Logs¶
Kafka Producer¶
You should see the following INFO messages in the logs of the Kafka producer:
INFO [Producer clientId=producer-my-custom-txnId, transactionalId=my-custom-txnId] Invoking InitProducerId for the first time in order to acquire a producer ID (org.apache.kafka.clients.producer.internals.TransactionManager)
INFO [Producer clientId=producer-my-custom-txnId, transactionalId=my-custom-txnId] Discovered transaction coordinator localhost:9092 (id: 0 rack: null) (org.apache.kafka.clients.producer.internals.TransactionManager)
INFO [Producer clientId=producer-my-custom-txnId, transactionalId=my-custom-txnId] ProducerId set to 0 with epoch 0 (org.apache.kafka.clients.producer.internals.TransactionManager)
Kafka Cluster¶
You should see the following INFO message in the logs of a Kafka cluster:
INFO [TransactionCoordinator id=0] Initialized transactionalId my-custom-txnId with producerId 0 and producer epoch 0 on partition __transaction_state-20 (kafka.coordinator.transaction.TransactionCoordinator)
The calculation to determine the transactional partition (__transaction_state-20
) is as follows:
Math.abs(transactionalId.hashCode) % 50
Start Up Consumer¶
kcat¶
kcat -C -b localhost -t txn-demo
You should see no records produced yet (since the transaction has not been committed yet).
kafka-console-consumer¶
./bin/kafka-console-consumer.sh \
--bootstrap-server :9092 \
--topic txn-demo \
--from-beginning
Unlike kcat
, kafka-console-consumer
uses read_uncommitted isolation level and so there should be records printed out to the console.
Use --isolation-level
option to set isolation.level configuration property.
Commit Transaction¶
Let's commit the transaction using KafkaProducer.commitTransaction.
producer.commitTransaction
Immediately after committing the transaction you should see the record printed out by the Kafka consumer.