RecordAccumulator¶
RecordAccumulator uses BufferPool.
Creating Instance¶
RecordAccumulator takes the following to be created:
- LogContext
- batch.size
- CompressionType
- linger.ms
- retry.backoff.ms
- configureDeliveryTimeout
-
Metrics - Name of the Metrics Group
-
Time -
ApiVersions - TransactionManager
- BufferPool
RecordAccumulator is created along with KafkaProducer.
BufferPool¶
RecordAccumulator is given a BufferPool when created.
All the metrics of RecordAccumulator report performance of the BufferPool.
The BufferPool is used for the following:
The BufferPool is closed while RecordAccumulator is requested to close.
Metrics¶
RecordAccumulator registers the metrics under the producer-metrics group name.
buffer-available-bytes¶
The total amount of buffer memory that is not being used (either unallocated or in the free list)
availableMemory of the BufferPool
buffer-total-bytes¶
The maximum amount of buffer memory the client can use (whether or not it is currently used)
totalMemory of the BufferPool
waiting-threads¶
The number of user threads blocked waiting for buffer memory to enqueue their records
queued of the BufferPool
TransactionManager¶
RecordAccumulator is given a TransactionManager when created.
RecordAccumulator uses the TransactionManager when requested for the following:
- reenqueue
- splitAndReenqueue
- insertInSequenceOrder
- drain (drainBatchesForOneNode and shouldStopDrainBatchesForPartition)
- abortUndrainedBatches
appendsInProgress Counter¶
RecordAccumulator creates an AtomicInteger (Java) for appendsInProgress internal counter when created.
appendsInProgress simply marks a single execution of append (and is incremented at the beginning and decremented right at the end).
appendsInProgress is used when flushInProgress.
flushInProgress¶
boolean appendsInProgress()
appendsInProgress indicates if the appendsInProgress counter is above 0.
appendsInProgress is used when abortIncompleteBatches.
flushesInProgress Counter¶
RecordAccumulator creates an AtomicInteger (Java) for flushesInProgress internal counter when created.
flushesInProgress is incremented when beginFlush and decremented when awaitFlushCompletion.
flushesInProgress is used when flushInProgress.
flushInProgress¶
boolean flushInProgress()
flushInProgress indicates if the flushesInProgress counter is above 0.
flushInProgress is used when:
RecordAccumulatoris requested to readySenderis requested to maybeSendAndPollTransactionalRequest
Appending Record¶
RecordAppendResult append(
String topic,
int partition,
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
AppendCallbacks callbacks,
long maxTimeToBlock,
boolean abortOnNewBatch,
long nowMs,
Cluster cluster)
append increments appendsInProgress.
append setPartition with the given AppendCallbacks.
append finds an in-progress batch (among the deque of ProducerBatchs).
append tryAppend.
append...FIXME (there is so much more magic going on yet it doesn't seem as important).
append is used when:
KafkaProduceris requested to send a record (and doSend)
tryAppend¶
RecordAppendResult tryAppend(
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
Callback callback,
Deque<ProducerBatch> deque,
long nowMs)
tryAppend...FIXME
ready¶
ReadyCheckResult ready(
Cluster cluster,
long nowMs)
ready is a list of partitions with data ready to send.
ready...FIXME
ready is used when:
Senderis requested to sendProducerData
beginFlush¶
void beginFlush()
beginFlush atomically increments the flushesInProgress counter.
beginFlush is used when:
KafkaProduceris requested to flushSenderis requested to maybeSendAndPollTransactionalRequest
awaitFlushCompletion¶
void awaitFlushCompletion()
awaitFlushCompletion...FIXME
awaitFlushCompletion is used when:
KafkaProduceris requested to flush
splitAndReenqueue¶
int splitAndReenqueue(
ProducerBatch bigBatch)
splitAndReenqueue...FIXME
splitAndReenqueue is used when:
Senderis requested to completeBatch
deallocate¶
void deallocate(
ProducerBatch batch)
deallocate...FIXME
deallocate is used when:
RecordAccumulatoris requested to abortBatches and abortUndrainedBatchesSenderis requested to maybeRemoveAndDeallocateBatch
abortBatches¶
void abortBatches() // (1)
void abortBatches(
RuntimeException reason)
- Uses a
KafkaException
abortBatches...FIXME
abortBatches is used when:
RecordAccumulatoris requested to abortIncompleteBatchesSenderis requested to maybeAbortBatches
abortIncompleteBatches¶
void abortIncompleteBatches()
abortIncompleteBatches abortBatches as long as there are appendsInProgress. abortIncompleteBatches abortBatches one last time (after no thread was appending in case there was a new batch appended by the last appending thread).
In the end, abortIncompleteBatches clears the batches registry.
abortIncompleteBatches is used when:
Senderis requested to run (and forceClose)
abortUndrainedBatches¶
void abortUndrainedBatches(
RuntimeException reason)
abortUndrainedBatches...FIXME
abortUndrainedBatches is used when:
Senderis requested to maybeSendAndPollTransactionalRequest
Incomplete (Pending) Batches¶
RecordAccumulator creates an IncompleteBatches for incomplete internal registry of pending batches when created.
RecordAccumulator uses the IncompleteBatches when:
- append (to add a new
ProducerBatch) - splitAndReenqueue (to add a new
ProducerBatch) - deallocate (to remove a
ProducerBatch) - awaitFlushCompletion, abortBatches and abortUndrainedBatches (to copy all
ProducerBatchs)
hasIncomplete¶
boolean hasIncomplete()
hasIncomplete is true when the incomplete registry is not empty.
hasIncomplete is used when:
Senderis requested to maybeSendAndPollTransactionalRequest and maybeAbortBatches
In-Progress Batches¶
ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches
RecordAccumulator creates a ConcurrentMap (Java) for the batches internal registry of in-progress ProducerBatches (per TopicPartition).
RecordAccumulator adds a new ArrayDeque (Java) when getOrCreateDeque.
batches is used when:
getOrCreateDeque¶
Deque<ProducerBatch> getOrCreateDeque(
TopicPartition tp)
getOrCreateDeque...FIXME
getOrCreateDeque is used when:
RecordAccumulatoris requested to append, reenqueue, splitAndReenqueue
reenqueue¶
void reenqueue(
ProducerBatch batch,
long now)
reenqueue...FIXME
reenqueue is used when:
Senderis requested to reenqueueBatch
insertInSequenceOrder¶
void insertInSequenceOrder(
Deque<ProducerBatch> deque,
ProducerBatch batch)
insertInSequenceOrder...FIXME
insertInSequenceOrder is used when:
RecordAccumulatoris requested to reenqueue and splitAndReenqueue
drain¶
Map<Integer, List<ProducerBatch>> drain(
Cluster cluster,
Set<Node> nodes,
int maxSize,
long now)
drain...FIXME
drain is used when:
Senderis requested to sendProducerData
drainBatchesForOneNode¶
List<ProducerBatch> drainBatchesForOneNode(
Cluster cluster,
Node node,
int maxSize,
long now)
drainBatchesForOneNode...FIXME
shouldStopDrainBatchesForPartition¶
boolean shouldStopDrainBatchesForPartition(
ProducerBatch first,
TopicPartition tp)
shouldStopDrainBatchesForPartition...FIXME
registerMetrics¶
void registerMetrics(
Metrics metrics,
String metricGrpName)
registerMetrics registers (adds) the metrics to the given Metrics.
registerMetrics is used when:
RecordAccumulatoris created
Logging¶
Enable ALL logging level for org.apache.kafka.clients.producer.internals.RecordAccumulator logger to see what happens inside.
Add the following line to log4j.properties:
log4j.logger.org.apache.kafka.clients.producer.internals.RecordAccumulator=ALL
Refer to Logging.