KafkaMicroBatchStream is a MicroBatchStream for Kafka Data Source for Micro-Batch Stream Processing.
KafkaMicroBatchStream takes the following to be created:
- Kafka Params For Executors
- Metadata path
- Starting KafkaOffsetRangeLimit
KafkaMicroBatchStream is created when:
KafkaScanis requested for a MicroBatchStream
Kafka Params For Executors¶
KafkaMicroBatchStream is given Kafka params to use on executors when created.
The Kafka params are the kafkaParamsForExecutors based on the options of the KafkaScan (this
KafkaMicroBatchStream is created for) that have been converted (
KafkaMicroBatchStream is given
failOnDataLoss flag when created.
failOnDataLoss is the value of failOnDataLoss option.
failOnDataLoss flag is used for the following:
KafkaMicroBatchStream is a SupportsTriggerAvailableNow.
prepareForTriggerAvailableNow is part of the SupportsTriggerAvailableNow abstraction.
prepareForTriggerAvailableNow sets the allDataForTriggerAvailableNow internal registry to fetchLatestOffsets (of the KafkaOffsetReader) for getOrCreateInitialPartitionOffsets.
KafkaMicroBatchStream is a ReportsSourceMetrics.
metrics( latestConsumedOffset: Optional[Offset]): Map[String, String]
metrics is part of the ReportsSourceMetrics abstraction.
metrics returns the metrics for the given
latestConsumedOffset and the latestPartitionOffsets.
metrics( latestConsumedOffset: Optional[Offset], latestAvailablePartitionOffsets: Map[TopicPartition, Long]): Map[String, String]
metrics converts the given
latestConsumedOffset to a KafkaSourceOffset when defined.
metrics returns the following performance metrics for the offsets in the given
latestAvailablePartitionOffsets behind the latest partitionToOffsets (of the given
Default Read Limit¶
getDefaultReadLimit is part of the SupportsAdmissionControl abstraction.
getDefaultReadLimit uses minOffsetsPerTrigger and maxOffsetsPerTrigger options to determine ReadLimit.
getDefaultReadLimit uses maxTriggerDelayMs option, too, but it has a default value so it is always available.
|CompositeReadLimit||Both minOffsetsPerTrigger and maxOffsetsPerTrigger defined|
|ReadMinRows||Only minOffsetPerTrigger defined|
|ReadMaxRows||Only maxOffsetsPerTrigger defined|
In other words, with minOffsetsPerTrigger and maxOffsetsPerTrigger defined,
getDefaultReadLimit creates a CompositeReadLimit with the following:
- ReadMinRows with minOffsetsPerTrigger (and maxTriggerDelayMs)
- ReadMaxRows with maxOffsetsPerTrigger
With only minOffsetPerTrigger defined (with no maxOffsetsPerTrigger),
getDefaultReadLimit creates a ReadMinRows with minOffsetsPerTrigger (and maxTriggerDelayMs).
getDefaultReadLimit takes the maxOffsetsPerTrigger, if defined, and creates a
ReadMaxRows (with the approximate maximum rows to scan) or defaults to ReadAllAvailable.
KafkaMicroBatchStream takes the value of maxOffsetsPerTrigger option (in the options), if available, when created. Otherwise,
KafkaMicroBatchStream takes the value of minOffsetsPerTrigger option (in the options), if available, or defaults to
minOffsetPerTrigger is used to determine the default limit on the number of records to read.
KafkaMicroBatchStream reads the value of maxTriggerDelay option (in the options) when created.
latestOffset( start: Offset, readLimit: ReadLimit): Offset
latestOffset is part of the SupportsAdmissionControl abstraction.
latestOffset converts the given
start offset to a KafkaSourceOffset to request for the partitionToOffsets.
latestOffset sets the latestPartitionOffsets internal registry to be as follows:
- allDataForTriggerAvailableNow, if available
- fetchLatestOffsets of the KafkaOffsetReader (for the partitionToOffsets of the given KafkaSourceOffset), otherwise
When is allDataForTriggerAvailableNow available?
latestOffset requests the given ReadLimit for read limits if it is a CompositeReadLimit. Otherwise,
latestOffset uses the given ReadLimit as the only read limit.
latestOffset determines the offsets to read based on the read limits.
With ReadAllAvailable among the read limits,
latestOffsetuses the latestPartitionOffsets registry.
ReadAllAvailablehas the highest priority as it is necessary for
Trigger.Onceto work properly.
With ReadMinRows among the read limits,
latestOffsetchecks whether to skip this trigger or not (using the
ReadMinRowsas well as the latestPartitionOffsets and the partitionToOffsets of the given KafkaSourceOffset).
If there is not enough rows available (based on
maxTriggerDelayMshas not elapsed yet,
latestOffsetprints out the following DEBUG message to the logs:
Delaying batch as number of records available is less than minOffsetsPerTrigger
With ReadMaxRows among the read limits,
latestOffsetrateLimit (with the
maxRowsas well as the latestPartitionOffsets and the partitionToOffsets of the given KafkaSourceOffset).
With neither ReadMinRows nor ReadMaxRows among the read limits,
latestOffsetuses the latestPartitionOffsets registry (as if ReadAllAvailable were among the read limits).
In the end,
latestOffset records the offsets in the endPartitionOffsets registry.
endPartitionOffsets can be as follows based on the read limits:
- latestPartitionOffsets for
- partitionToOffsets of the given KafkaSourceOffset for
ReadMinRowsand a batch delayed
- rateLimit for
delayBatch( minLimit: Long, latestOffsets: Map[TopicPartition, Long], currentOffsets: Map[TopicPartition, Long], maxTriggerDelayMs: Long): Boolean
false (no delay) when either holds:
- The given
maxTriggerDelayMshas passed since lastTriggerMillis
- The total of new records (across all topics and partitions given
currentOffsets) is at least the given
| ||Offsets by Partitions of the start offset (as given to latestOffset)|
If the given
maxTriggerDelayMs has passed (since lastTriggerMillis),
delayBatch prints out the following DEBUG message to the logs, records the current timestamp in lastTriggerMillis registry and returns
false (no delay).
Maximum wait time is passed, triggering batch
delayBatch calculates the number of new records (based on the given
If the number of new records is below the given
true (delay). Otherwise,
delayBatch records the current timestamp in lastTriggerMillis registry and returns
false (no delay).
reportDataLoss(message: String): Unit
With failOnDataLoss enabled,
reportDataLoss throws an
IllegalStateException (with the given
[message]. Some data may have been lost because they are not available in Kafka any more; either the data was aged out by Kafka or the topic may have been deleted before all the data in the topic was processed. If you don't want your streaming query to fail on such cases, set the source option "failOnDataLoss" to "false"
reportDataLoss prints out the following WARN message (with the given
message) to the logs:
[message]. Some data may have been lost because they are not available in Kafka any more; either the data was aged out by Kafka or the topic may have been deleted before all the data in the topic was processed. If you want your streaming query to fail on such cases, set the source option "failOnDataLoss" to "true"
reportDataLoss is used when:
KafkaMicroBatchStreamis requested to planInputPartitions and getOrCreateInitialPartitionOffsets
toString is part of the
Object (Java) abstraction.
toString is the following (with the string representation of the KafkaOffsetReader):
ALL logging level for
org.apache.spark.sql.kafka010.KafkaMicroBatchStream logger to see what happens inside.
Add the following line to
Refer to Logging.