RecordAccumulator¶
Creating Instance¶
RecordAccumulator
takes the following to be created:
- LogContext
- batch.size
- CompressionType
- linger.ms
- retry.backoff.ms
- configureDeliveryTimeout
-
Metrics
- Name of the Metrics Group
-
Time
-
ApiVersions
- TransactionManager
- BufferPool
RecordAccumulator
is created along with KafkaProducer.
TransactionManager¶
RecordAccumulator
is given a TransactionManager when created.
RecordAccumulator
uses the TransactionManager
when requested for the following:
- reenqueue
- splitAndReenqueue
- insertInSequenceOrder
- drain (drainBatchesForOneNode and shouldStopDrainBatchesForPartition)
- abortUndrainedBatches
appendsInProgress Counter¶
RecordAccumulator
creates an AtomicInteger
(Java) for appendsInProgress
internal counter when created.
appendsInProgress
simply marks a single execution of append (and is incremented at the beginning and decremented right at the end).
appendsInProgress
is used when flushInProgress.
flushInProgress¶
boolean appendsInProgress()
appendsInProgress
indicates if the appendsInProgress counter is above 0
.
appendsInProgress
is used when abortIncompleteBatches.
flushesInProgress Counter¶
RecordAccumulator
creates an AtomicInteger
(Java) for flushesInProgress
internal counter when created.
flushesInProgress
is incremented when beginFlush and decremented when awaitFlushCompletion.
flushesInProgress
is used when flushInProgress.
flushInProgress¶
boolean flushInProgress()
flushInProgress
indicates if the flushesInProgress counter is above 0
.
flushInProgress
is used when:
RecordAccumulator
is requested to readySender
is requested to maybeSendAndPollTransactionalRequest
Appending Record¶
RecordAppendResult append(
TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
Callback callback,
long maxTimeToBlock,
boolean abortOnNewBatch,
long nowMs)
append
...FIXME
append
is used when:
KafkaProducer
is requested to send a record (and doSend)
tryAppend¶
RecordAppendResult tryAppend(
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
Callback callback,
Deque<ProducerBatch> deque,
long nowMs)
tryAppend
...FIXME
ready¶
ReadyCheckResult ready(
Cluster cluster,
long nowMs)
ready
is a list of partitions with data ready to send.
ready
...FIXME
ready
is used when:
Sender
is requested to sendProducerData
beginFlush¶
void beginFlush()
beginFlush
atomically increments the flushesInProgress counter.
beginFlush
is used when:
KafkaProducer
is requested to flushSender
is requested to maybeSendAndPollTransactionalRequest
awaitFlushCompletion¶
void awaitFlushCompletion()
awaitFlushCompletion
...FIXME
awaitFlushCompletion
is used when:
KafkaProducer
is requested to flush
splitAndReenqueue¶
int splitAndReenqueue(
ProducerBatch bigBatch)
splitAndReenqueue
...FIXME
splitAndReenqueue
is used when:
Sender
is requested to completeBatch
deallocate¶
void deallocate(
ProducerBatch batch)
deallocate
...FIXME
deallocate
is used when:
RecordAccumulator
is requested to abortBatches and abortUndrainedBatchesSender
is requested to maybeRemoveAndDeallocateBatch
abortBatches¶
void abortBatches() // (1)
void abortBatches(
RuntimeException reason)
- Uses a
KafkaException
abortBatches
...FIXME
abortBatches
is used when:
RecordAccumulator
is requested to abortIncompleteBatchesSender
is requested to maybeAbortBatches
abortIncompleteBatches¶
void abortIncompleteBatches()
abortIncompleteBatches
abortBatches as long as there are appendsInProgress. abortIncompleteBatches
abortBatches one last time (after no thread was appending in case there was a new batch appended by the last appending thread).
In the end, abortIncompleteBatches
clears the batches registry.
abortIncompleteBatches
is used when:
Sender
is requested to run (and forceClose)
abortUndrainedBatches¶
void abortUndrainedBatches(
RuntimeException reason)
abortUndrainedBatches
...FIXME
abortUndrainedBatches
is used when:
Sender
is requested to maybeSendAndPollTransactionalRequest
Incomplete (Pending) Batches¶
RecordAccumulator
creates an IncompleteBatches
for incomplete
internal registry of pending batches when created.
RecordAccumulator
uses the IncompleteBatches
when:
- append (to add a new
ProducerBatch
) - splitAndReenqueue (to add a new
ProducerBatch
) - deallocate (to remove a
ProducerBatch
) - awaitFlushCompletion, abortBatches and abortUndrainedBatches (to copy all
ProducerBatch
s)
hasIncomplete¶
boolean hasIncomplete()
hasIncomplete
is true
when the incomplete registry is not empty.
hasIncomplete
is used when:
Sender
is requested to maybeSendAndPollTransactionalRequest and maybeAbortBatches
In-Progress Batches¶
ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches
RecordAccumulator
creates a ConcurrentMap
(Java) for the batches
internal registry of in-progress ProducerBatches (per TopicPartition
).
RecordAccumulator
adds a new ArrayDeque
(Java) when getOrCreateDeque.
batches
is used when:
getOrCreateDeque¶
Deque<ProducerBatch> getOrCreateDeque(
TopicPartition tp)
getOrCreateDeque
...FIXME
getOrCreateDeque
is used when:
RecordAccumulator
is requested to append, reenqueue, splitAndReenqueue
reenqueue¶
void reenqueue(
ProducerBatch batch,
long now)
reenqueue
...FIXME
reenqueue
is used when:
Sender
is requested to reenqueueBatch
insertInSequenceOrder¶
void insertInSequenceOrder(
Deque<ProducerBatch> deque,
ProducerBatch batch)
insertInSequenceOrder
...FIXME
insertInSequenceOrder
is used when:
RecordAccumulator
is requested to reenqueue and splitAndReenqueue
drain¶
Map<Integer, List<ProducerBatch>> drain(
Cluster cluster,
Set<Node> nodes,
int maxSize,
long now)
drain
...FIXME
drain
is used when:
Sender
is requested to sendProducerData
drainBatchesForOneNode¶
List<ProducerBatch> drainBatchesForOneNode(
Cluster cluster,
Node node,
int maxSize,
long now)
drainBatchesForOneNode
...FIXME
shouldStopDrainBatchesForPartition¶
boolean shouldStopDrainBatchesForPartition(
ProducerBatch first,
TopicPartition tp)
shouldStopDrainBatchesForPartition
...FIXME