Skip to content

KafkaMicroBatchStream

KafkaMicroBatchStream is a MicroBatchStream for Kafka Data Source for Micro-Batch Stream Processing.

Creating Instance

KafkaMicroBatchStream takes the following to be created:

KafkaMicroBatchStream is created when:

Kafka Params For Executors

KafkaMicroBatchStream is given Kafka params to use on executors when created.

The Kafka params are the kafkaParamsForExecutors based on the options of the KafkaScan (this KafkaMicroBatchStream is created for) that have been converted (kafka-prefix removed).

failOnDataLoss

failOnDataLoss: Boolean

KafkaMicroBatchStream is given failOnDataLoss flag when created.

failOnDataLoss is the value of failOnDataLoss option.

failOnDataLoss flag is used for the following:

SupportsTriggerAvailableNow

KafkaMicroBatchStream is a SupportsTriggerAvailableNow.

prepareForTriggerAvailableNow

prepareForTriggerAvailableNow(): Unit

prepareForTriggerAvailableNow is part of the SupportsTriggerAvailableNow abstraction.


prepareForTriggerAvailableNow sets the allDataForTriggerAvailableNow internal registry to fetchLatestOffsets (of the KafkaOffsetReader) for getOrCreateInitialPartitionOffsets.

ReportsSourceMetrics

KafkaMicroBatchStream is a ReportsSourceMetrics.

Performance Metrics

metrics(
  latestConsumedOffset: Optional[Offset]): Map[String, String]

metrics is part of the ReportsSourceMetrics abstraction.


metrics returns the metrics for the given latestConsumedOffset and the latestPartitionOffsets.

metrics

metrics(
  latestConsumedOffset: Optional[Offset],
  latestAvailablePartitionOffsets: Map[TopicPartition, Long]): Map[String, String]

metrics converts the given latestConsumedOffset to a KafkaSourceOffset when defined.

metrics returns the following performance metrics for the offsets in the given latestAvailablePartitionOffsets behind the latest partitionToOffsets (of the given latestConsumedOffset):

  • minOffsetsBehindLatest
  • maxOffsetsBehindLatest
  • avgOffsetsBehindLatest

Default Read Limit

getDefaultReadLimit: ReadLimit

getDefaultReadLimit is part of the SupportsAdmissionControl abstraction.


getDefaultReadLimit uses minOffsetsPerTrigger and maxOffsetsPerTrigger options to determine ReadLimit.

getDefaultReadLimit uses maxTriggerDelayMs option, too, but it has a default value so it is always available.

ReadLimit Condition
CompositeReadLimit Both minOffsetsPerTrigger and maxOffsetsPerTrigger defined
ReadMinRows Only minOffsetPerTrigger defined
ReadMaxRows Only maxOffsetsPerTrigger defined
ReadAllAvailable

In other words, with minOffsetsPerTrigger and maxOffsetsPerTrigger defined, getDefaultReadLimit creates a CompositeReadLimit with the following:

With only minOffsetPerTrigger defined (with no maxOffsetsPerTrigger), getDefaultReadLimit creates a ReadMinRows with minOffsetsPerTrigger (and maxTriggerDelayMs).

Otherwise, getDefaultReadLimit takes the maxOffsetsPerTrigger, if defined, and creates a ReadMaxRows (with the approximate maximum rows to scan) or defaults to ReadAllAvailable.

maxOffsetsPerTrigger

maxOffsetsPerTrigger: Option[Long]

KafkaMicroBatchStream takes the value of maxOffsetsPerTrigger option (in the options), if available, when created. Otherwise, maxOffsetsPerTrigger is None (undefined).

minOffsetPerTrigger

minOffsetPerTrigger: Option[Long]

When created, KafkaMicroBatchStream takes the value of minOffsetsPerTrigger option (in the options), if available, or defaults to None (undefined).

minOffsetPerTrigger is used to determine the default limit on the number of records to read.

maxTriggerDelayMs

KafkaMicroBatchStream reads the value of maxTriggerDelay option (in the options) when created.

Latest Offset

latestOffset(
  start: Offset,
  readLimit: ReadLimit): Offset

latestOffset is part of the SupportsAdmissionControl abstraction.


latestOffset converts the given start offset to a KafkaSourceOffset to request for the partitionToOffsets.

latestOffset sets the latestPartitionOffsets internal registry to be as follows:

FIXME

When is allDataForTriggerAvailableNow available?

latestOffset requests the given ReadLimit for read limits if it is a CompositeReadLimit. Otherwise, latestOffset uses the given ReadLimit as the only read limit.

latestOffset determines the offsets to read based on the read limits.

In the end, latestOffset records the offsets in the endPartitionOffsets registry.

Summary

endPartitionOffsets can be as follows based on the read limits:

delayBatch

delayBatch(
  minLimit: Long,
  latestOffsets: Map[TopicPartition, Long],
  currentOffsets: Map[TopicPartition, Long],
  maxTriggerDelayMs: Long): Boolean

Summary

delayBatch is false (no delay) when either holds:

  • The given maxTriggerDelayMs has passed since lastTriggerMillis
  • The total of new records (across all topics and partitions given latestOffsets and currentOffsets) is at least the given minLimit

Otherwise, delayBatch is true (delay).

Input Parameter Value
minLimit minOffsetPerTrigger
latestOffsets latestPartitionOffsets
currentOffsets Offsets by Partitions of the start offset (as given to latestOffset)
maxTriggerDelayMs maxTriggerDelayMs

If the given maxTriggerDelayMs has passed (since lastTriggerMillis), delayBatch prints out the following DEBUG message to the logs, records the current timestamp in lastTriggerMillis registry and returns false (no delay).

Maximum wait time is passed, triggering batch

Otherwise, delayBatch calculates the number of new records (based on the given latestOffsets and currentOffsets).

If the number of new records is below the given minLimit, delayBatch returns true (delay). Otherwise, delayBatch records the current timestamp in lastTriggerMillis registry and returns false (no delay).

reportDataLoss

reportDataLoss(message: String): Unit

With failOnDataLoss enabled, reportDataLoss throws an IllegalStateException (with the given message):

[message]. Some data may have been lost because they are not available in Kafka any more;
either the data was aged out by Kafka or the topic may have been deleted before all the data in the topic was processed.
If you don't want your streaming query to fail on such cases, set the source option "failOnDataLoss" to "false"

Otherwise, reportDataLoss prints out the following WARN message (with the given message) to the logs:

[message]. Some data may have been lost because they are not available in Kafka any more;
either the data was aged out by Kafka or the topic may have been deleted before all the data in the
topic was processed.
If you want your streaming query to fail on such cases, set the source option "failOnDataLoss" to "true"

reportDataLoss is used when:

String Representation

toString(): String

toString is part of the Object (Java) abstraction.


toString is the following (with the string representation of the KafkaOffsetReader):

KafkaV2[[kafkaOffsetReader]]

Logging

Enable ALL logging level for org.apache.spark.sql.kafka010.KafkaMicroBatchStream logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.kafka010.KafkaMicroBatchStream=ALL

Refer to Logging.