Skip to content

RecordAccumulator

Creating Instance

RecordAccumulator takes the following to be created:

RecordAccumulator is created along with KafkaProducer.

TransactionManager

RecordAccumulator is given a TransactionManager when created.

RecordAccumulator uses the TransactionManager when requested for the following:

appendsInProgress Counter

RecordAccumulator creates an AtomicInteger (Java) for appendsInProgress internal counter when created.

appendsInProgress simply marks a single execution of append (and is incremented at the beginning and decremented right at the end).

appendsInProgress is used when flushInProgress.

flushInProgress

boolean appendsInProgress()

appendsInProgress indicates if the appendsInProgress counter is above 0.

appendsInProgress is used when abortIncompleteBatches.

flushesInProgress Counter

RecordAccumulator creates an AtomicInteger (Java) for flushesInProgress internal counter when created.

flushesInProgress is incremented when beginFlush and decremented when awaitFlushCompletion.

flushesInProgress is used when flushInProgress.

flushInProgress

boolean flushInProgress()

flushInProgress indicates if the flushesInProgress counter is above 0.

flushInProgress is used when:

Appending Record

RecordAppendResult append(
  TopicPartition tp,
  long timestamp,
  byte[] key,
  byte[] value,
  Header[] headers,
  Callback callback,
  long maxTimeToBlock,
  boolean abortOnNewBatch,
  long nowMs)

append...FIXME

append is used when:

tryAppend

RecordAppendResult tryAppend(
  long timestamp,
  byte[] key,
  byte[] value,
  Header[] headers,
  Callback callback,
  Deque<ProducerBatch> deque,
  long nowMs)

tryAppend...FIXME

ready

ReadyCheckResult ready(
  Cluster cluster,
  long nowMs)

ready is a list of partitions with data ready to send.

ready...FIXME

ready is used when:

beginFlush

void beginFlush()

beginFlush atomically increments the flushesInProgress counter.

beginFlush is used when:

awaitFlushCompletion

void awaitFlushCompletion()

awaitFlushCompletion...FIXME

awaitFlushCompletion is used when:

  • KafkaProducer is requested to flush

splitAndReenqueue

int splitAndReenqueue(
  ProducerBatch bigBatch)

splitAndReenqueue...FIXME

splitAndReenqueue is used when:

deallocate

void deallocate(
  ProducerBatch batch)

deallocate...FIXME

deallocate is used when:

abortBatches

void abortBatches() // (1)
void abortBatches(
  RuntimeException reason)
  1. Uses a KafkaException

abortBatches...FIXME

abortBatches is used when:

abortIncompleteBatches

void abortIncompleteBatches()

abortIncompleteBatches abortBatches as long as there are appendsInProgress. abortIncompleteBatches abortBatches one last time (after no thread was appending in case there was a new batch appended by the last appending thread).

In the end, abortIncompleteBatches clears the batches registry.

abortIncompleteBatches is used when:

abortUndrainedBatches

void abortUndrainedBatches(
  RuntimeException reason)

abortUndrainedBatches...FIXME

abortUndrainedBatches is used when:

Incomplete (Pending) Batches

RecordAccumulator creates an IncompleteBatches for incomplete internal registry of pending batches when created.

RecordAccumulator uses the IncompleteBatches when:

hasIncomplete

boolean hasIncomplete()

hasIncomplete is true when the incomplete registry is not empty.

hasIncomplete is used when:

In-Progress Batches

ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches

RecordAccumulator creates a ConcurrentMap (Java) for the batches internal registry of in-progress ProducerBatches (per TopicPartition).

RecordAccumulator adds a new ArrayDeque (Java) when getOrCreateDeque.

batches is used when:

getOrCreateDeque

Deque<ProducerBatch> getOrCreateDeque(
  TopicPartition tp)

getOrCreateDeque...FIXME

getOrCreateDeque is used when:

reenqueue

void reenqueue(
  ProducerBatch batch,
  long now)

reenqueue...FIXME

reenqueue is used when:

insertInSequenceOrder

void insertInSequenceOrder(
  Deque<ProducerBatch> deque,
  ProducerBatch batch)

insertInSequenceOrder...FIXME

insertInSequenceOrder is used when:

drain

Map<Integer, List<ProducerBatch>> drain(
  Cluster cluster,
  Set<Node> nodes,
  int maxSize,
  long now)

drain...FIXME

drain is used when:

drainBatchesForOneNode

List<ProducerBatch> drainBatchesForOneNode(
  Cluster cluster,
  Node node,
  int maxSize,
  long now)

drainBatchesForOneNode...FIXME

shouldStopDrainBatchesForPartition

boolean shouldStopDrainBatchesForPartition(
  ProducerBatch first,
  TopicPartition tp)

shouldStopDrainBatchesForPartition...FIXME

Back to top