RecordAccumulator¶
RecordAccumulator
uses BufferPool.
Creating Instance¶
RecordAccumulator
takes the following to be created:
- LogContext
- batch.size
- CompressionType
- linger.ms
- retry.backoff.ms
- configureDeliveryTimeout
-
Metrics
- Name of the Metrics Group
-
Time
-
ApiVersions
- TransactionManager
- BufferPool
RecordAccumulator
is created along with KafkaProducer.
BufferPool¶
RecordAccumulator
is given a BufferPool when created.
All the metrics of RecordAccumulator
report performance of the BufferPool
.
The BufferPool
is used for the following:
The BufferPool
is closed while RecordAccumulator
is requested to close.
Metrics¶
RecordAccumulator
registers the metrics under the producer-metrics group name.
buffer-available-bytes¶
The total amount of buffer memory that is not being used (either unallocated or in the free list)
availableMemory of the BufferPool
buffer-total-bytes¶
The maximum amount of buffer memory the client can use (whether or not it is currently used)
totalMemory of the BufferPool
waiting-threads¶
The number of user threads blocked waiting for buffer memory to enqueue their records
queued of the BufferPool
TransactionManager¶
RecordAccumulator
is given a TransactionManager when created.
RecordAccumulator
uses the TransactionManager
when requested for the following:
- reenqueue
- splitAndReenqueue
- insertInSequenceOrder
- drain (drainBatchesForOneNode and shouldStopDrainBatchesForPartition)
- abortUndrainedBatches
appendsInProgress Counter¶
RecordAccumulator
creates an AtomicInteger
(Java) for appendsInProgress
internal counter when created.
appendsInProgress
simply marks a single execution of append (and is incremented at the beginning and decremented right at the end).
appendsInProgress
is used when flushInProgress.
flushInProgress¶
boolean appendsInProgress()
appendsInProgress
indicates if the appendsInProgress counter is above 0
.
appendsInProgress
is used when abortIncompleteBatches.
flushesInProgress Counter¶
RecordAccumulator
creates an AtomicInteger
(Java) for flushesInProgress
internal counter when created.
flushesInProgress
is incremented when beginFlush and decremented when awaitFlushCompletion.
flushesInProgress
is used when flushInProgress.
flushInProgress¶
boolean flushInProgress()
flushInProgress
indicates if the flushesInProgress counter is above 0
.
flushInProgress
is used when:
RecordAccumulator
is requested to readySender
is requested to maybeSendAndPollTransactionalRequest
Appending Record¶
RecordAppendResult append(
String topic,
int partition,
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
AppendCallbacks callbacks,
long maxTimeToBlock,
boolean abortOnNewBatch,
long nowMs,
Cluster cluster)
append
increments appendsInProgress.
append
setPartition with the given AppendCallbacks
.
append
finds an in-progress batch (among the deque of ProducerBatchs).
append
tryAppend.
append
...FIXME (there is so much more magic going on yet it doesn't seem as important).
append
is used when:
KafkaProducer
is requested to send a record (and doSend)
tryAppend¶
RecordAppendResult tryAppend(
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
Callback callback,
Deque<ProducerBatch> deque,
long nowMs)
tryAppend
...FIXME
ready¶
ReadyCheckResult ready(
Cluster cluster,
long nowMs)
ready
is a list of partitions with data ready to send.
ready
...FIXME
ready
is used when:
Sender
is requested to sendProducerData
beginFlush¶
void beginFlush()
beginFlush
atomically increments the flushesInProgress counter.
beginFlush
is used when:
KafkaProducer
is requested to flushSender
is requested to maybeSendAndPollTransactionalRequest
awaitFlushCompletion¶
void awaitFlushCompletion()
awaitFlushCompletion
...FIXME
awaitFlushCompletion
is used when:
KafkaProducer
is requested to flush
splitAndReenqueue¶
int splitAndReenqueue(
ProducerBatch bigBatch)
splitAndReenqueue
...FIXME
splitAndReenqueue
is used when:
Sender
is requested to completeBatch
deallocate¶
void deallocate(
ProducerBatch batch)
deallocate
...FIXME
deallocate
is used when:
RecordAccumulator
is requested to abortBatches and abortUndrainedBatchesSender
is requested to maybeRemoveAndDeallocateBatch
abortBatches¶
void abortBatches() // (1)
void abortBatches(
RuntimeException reason)
- Uses a
KafkaException
abortBatches
...FIXME
abortBatches
is used when:
RecordAccumulator
is requested to abortIncompleteBatchesSender
is requested to maybeAbortBatches
abortIncompleteBatches¶
void abortIncompleteBatches()
abortIncompleteBatches
abortBatches as long as there are appendsInProgress. abortIncompleteBatches
abortBatches one last time (after no thread was appending in case there was a new batch appended by the last appending thread).
In the end, abortIncompleteBatches
clears the batches registry.
abortIncompleteBatches
is used when:
Sender
is requested to run (and forceClose)
abortUndrainedBatches¶
void abortUndrainedBatches(
RuntimeException reason)
abortUndrainedBatches
...FIXME
abortUndrainedBatches
is used when:
Sender
is requested to maybeSendAndPollTransactionalRequest
Incomplete (Pending) Batches¶
RecordAccumulator
creates an IncompleteBatches
for incomplete
internal registry of pending batches when created.
RecordAccumulator
uses the IncompleteBatches
when:
- append (to add a new
ProducerBatch
) - splitAndReenqueue (to add a new
ProducerBatch
) - deallocate (to remove a
ProducerBatch
) - awaitFlushCompletion, abortBatches and abortUndrainedBatches (to copy all
ProducerBatch
s)
hasIncomplete¶
boolean hasIncomplete()
hasIncomplete
is true
when the incomplete registry is not empty.
hasIncomplete
is used when:
Sender
is requested to maybeSendAndPollTransactionalRequest and maybeAbortBatches
In-Progress Batches¶
ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches
RecordAccumulator
creates a ConcurrentMap
(Java) for the batches
internal registry of in-progress ProducerBatches (per TopicPartition
).
RecordAccumulator
adds a new ArrayDeque
(Java) when getOrCreateDeque.
batches
is used when:
getOrCreateDeque¶
Deque<ProducerBatch> getOrCreateDeque(
TopicPartition tp)
getOrCreateDeque
...FIXME
getOrCreateDeque
is used when:
RecordAccumulator
is requested to append, reenqueue, splitAndReenqueue
reenqueue¶
void reenqueue(
ProducerBatch batch,
long now)
reenqueue
...FIXME
reenqueue
is used when:
Sender
is requested to reenqueueBatch
insertInSequenceOrder¶
void insertInSequenceOrder(
Deque<ProducerBatch> deque,
ProducerBatch batch)
insertInSequenceOrder
...FIXME
insertInSequenceOrder
is used when:
RecordAccumulator
is requested to reenqueue and splitAndReenqueue
drain¶
Map<Integer, List<ProducerBatch>> drain(
Cluster cluster,
Set<Node> nodes,
int maxSize,
long now)
drain
...FIXME
drain
is used when:
Sender
is requested to sendProducerData
drainBatchesForOneNode¶
List<ProducerBatch> drainBatchesForOneNode(
Cluster cluster,
Node node,
int maxSize,
long now)
drainBatchesForOneNode
...FIXME
shouldStopDrainBatchesForPartition¶
boolean shouldStopDrainBatchesForPartition(
ProducerBatch first,
TopicPartition tp)
shouldStopDrainBatchesForPartition
...FIXME
registerMetrics¶
void registerMetrics(
Metrics metrics,
String metricGrpName)
registerMetrics
registers (adds) the metrics to the given Metrics.
registerMetrics
is used when:
RecordAccumulator
is created
Logging¶
Enable ALL
logging level for org.apache.kafka.clients.producer.internals.RecordAccumulator
logger to see what happens inside.
Add the following line to log4j.properties
:
log4j.logger.org.apache.kafka.clients.producer.internals.RecordAccumulator=ALL
Refer to Logging.