Skip to content

RecordAccumulator

RecordAccumulator uses BufferPool.

Creating Instance

RecordAccumulator takes the following to be created:

RecordAccumulator is created along with KafkaProducer.

BufferPool

RecordAccumulator is given a BufferPool when created.

All the metrics of RecordAccumulator report performance of the BufferPool.

The BufferPool is used for the following:

The BufferPool is closed while RecordAccumulator is requested to close.

Metrics

RecordAccumulator registers the metrics under the producer-metrics group name.

buffer-available-bytes

The total amount of buffer memory that is not being used (either unallocated or in the free list)

availableMemory of the BufferPool

buffer-total-bytes

The maximum amount of buffer memory the client can use (whether or not it is currently used)

totalMemory of the BufferPool

waiting-threads

The number of user threads blocked waiting for buffer memory to enqueue their records

queued of the BufferPool

TransactionManager

RecordAccumulator is given a TransactionManager when created.

RecordAccumulator uses the TransactionManager when requested for the following:

appendsInProgress Counter

RecordAccumulator creates an AtomicInteger (Java) for appendsInProgress internal counter when created.

appendsInProgress simply marks a single execution of append (and is incremented at the beginning and decremented right at the end).

appendsInProgress is used when flushInProgress.

flushInProgress

boolean appendsInProgress()

appendsInProgress indicates if the appendsInProgress counter is above 0.

appendsInProgress is used when abortIncompleteBatches.

flushesInProgress Counter

RecordAccumulator creates an AtomicInteger (Java) for flushesInProgress internal counter when created.

flushesInProgress is incremented when beginFlush and decremented when awaitFlushCompletion.

flushesInProgress is used when flushInProgress.

flushInProgress

boolean flushInProgress()

flushInProgress indicates if the flushesInProgress counter is above 0.

flushInProgress is used when:

Appending Record

RecordAppendResult append(
  String topic,
  int partition,
  long timestamp,
  byte[] key,
  byte[] value,
  Header[] headers,
  AppendCallbacks callbacks,
  long maxTimeToBlock,
  boolean abortOnNewBatch,
  long nowMs,
  Cluster cluster)

append increments appendsInProgress.

append setPartition with the given AppendCallbacks.

append finds an in-progress batch (among the deque of ProducerBatchs).

append tryAppend.

append...FIXME (there is so much more magic going on yet it doesn't seem as important).


append is used when:

tryAppend

RecordAppendResult tryAppend(
  long timestamp,
  byte[] key,
  byte[] value,
  Header[] headers,
  Callback callback,
  Deque<ProducerBatch> deque,
  long nowMs)

tryAppend...FIXME

ready

ReadyCheckResult ready(
  Cluster cluster,
  long nowMs)

ready is a list of partitions with data ready to send.

ready...FIXME

ready is used when:

beginFlush

void beginFlush()

beginFlush atomically increments the flushesInProgress counter.

beginFlush is used when:

awaitFlushCompletion

void awaitFlushCompletion()

awaitFlushCompletion...FIXME

awaitFlushCompletion is used when:

  • KafkaProducer is requested to flush

splitAndReenqueue

int splitAndReenqueue(
  ProducerBatch bigBatch)

splitAndReenqueue...FIXME

splitAndReenqueue is used when:

deallocate

void deallocate(
  ProducerBatch batch)

deallocate...FIXME

deallocate is used when:

abortBatches

void abortBatches() // (1)
void abortBatches(
  RuntimeException reason)
  1. Uses a KafkaException

abortBatches...FIXME

abortBatches is used when:

abortIncompleteBatches

void abortIncompleteBatches()

abortIncompleteBatches abortBatches as long as there are appendsInProgress. abortIncompleteBatches abortBatches one last time (after no thread was appending in case there was a new batch appended by the last appending thread).

In the end, abortIncompleteBatches clears the batches registry.

abortIncompleteBatches is used when:

abortUndrainedBatches

void abortUndrainedBatches(
  RuntimeException reason)

abortUndrainedBatches...FIXME

abortUndrainedBatches is used when:

Incomplete (Pending) Batches

RecordAccumulator creates an IncompleteBatches for incomplete internal registry of pending batches when created.

RecordAccumulator uses the IncompleteBatches when:

hasIncomplete

boolean hasIncomplete()

hasIncomplete is true when the incomplete registry is not empty.

hasIncomplete is used when:

In-Progress Batches

ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches

RecordAccumulator creates a ConcurrentMap (Java) for the batches internal registry of in-progress ProducerBatches (per TopicPartition).

RecordAccumulator adds a new ArrayDeque (Java) when getOrCreateDeque.

batches is used when:

getOrCreateDeque

Deque<ProducerBatch> getOrCreateDeque(
  TopicPartition tp)

getOrCreateDeque...FIXME

getOrCreateDeque is used when:

reenqueue

void reenqueue(
  ProducerBatch batch,
  long now)

reenqueue...FIXME

reenqueue is used when:

insertInSequenceOrder

void insertInSequenceOrder(
  Deque<ProducerBatch> deque,
  ProducerBatch batch)

insertInSequenceOrder...FIXME

insertInSequenceOrder is used when:

drain

Map<Integer, List<ProducerBatch>> drain(
  Cluster cluster,
  Set<Node> nodes,
  int maxSize,
  long now)

drain...FIXME

drain is used when:

drainBatchesForOneNode

List<ProducerBatch> drainBatchesForOneNode(
  Cluster cluster,
  Node node,
  int maxSize,
  long now)

drainBatchesForOneNode...FIXME

shouldStopDrainBatchesForPartition

boolean shouldStopDrainBatchesForPartition(
  ProducerBatch first,
  TopicPartition tp)

shouldStopDrainBatchesForPartition...FIXME

registerMetrics

void registerMetrics(
  Metrics metrics,
  String metricGrpName)

registerMetrics registers (adds) the metrics to the given Metrics.


registerMetrics is used when:

Logging

Enable ALL logging level for org.apache.kafka.clients.producer.internals.RecordAccumulator logger to see what happens inside.

Add the following line to log4j.properties:

log4j.logger.org.apache.kafka.clients.producer.internals.RecordAccumulator=ALL

Refer to Logging.