KafkaMicroBatchStream¶
KafkaMicroBatchStream is a MicroBatchStream for Kafka Data Source for Micro-Batch Stream Processing.
Creating Instance¶
KafkaMicroBatchStream takes the following to be created:
- KafkaOffsetReader
- Kafka Params For Executors
- Options
- Metadata path
- Starting KafkaOffsetRangeLimit
- failOnDataLoss
KafkaMicroBatchStream is created when:
KafkaScanis requested for a MicroBatchStream
Kafka Params For Executors¶
KafkaMicroBatchStream is given Kafka params to use on executors when created.
The Kafka params are the kafkaParamsForExecutors based on the options of the KafkaScan (this KafkaMicroBatchStream is created for) that have been converted (kafka-prefix removed).
failOnDataLoss¶
failOnDataLoss: Boolean
KafkaMicroBatchStream is given failOnDataLoss flag when created.
failOnDataLoss is the value of failOnDataLoss option.
failOnDataLoss flag is used for the following:
SupportsTriggerAvailableNow¶
KafkaMicroBatchStream is a SupportsTriggerAvailableNow.
prepareForTriggerAvailableNow¶
prepareForTriggerAvailableNow(): Unit
prepareForTriggerAvailableNow is part of the SupportsTriggerAvailableNow abstraction.
prepareForTriggerAvailableNow sets the allDataForTriggerAvailableNow internal registry to fetchLatestOffsets (of the KafkaOffsetReader) for getOrCreateInitialPartitionOffsets.
ReportsSourceMetrics¶
KafkaMicroBatchStream is a ReportsSourceMetrics.
Performance Metrics¶
metrics(
latestConsumedOffset: Optional[Offset]): Map[String, String]
metrics is part of the ReportsSourceMetrics abstraction.
metrics returns the metrics for the given latestConsumedOffset and the latestPartitionOffsets.
metrics¶
metrics(
latestConsumedOffset: Optional[Offset],
latestAvailablePartitionOffsets: Map[TopicPartition, Long]): Map[String, String]
metrics converts the given latestConsumedOffset to a KafkaSourceOffset when defined.
metrics returns the following performance metrics for the offsets in the given latestAvailablePartitionOffsets behind the latest partitionToOffsets (of the given latestConsumedOffset):
minOffsetsBehindLatestmaxOffsetsBehindLatestavgOffsetsBehindLatest
Default Read Limit¶
getDefaultReadLimit: ReadLimit
getDefaultReadLimit is part of the SupportsAdmissionControl abstraction.
getDefaultReadLimit uses minOffsetsPerTrigger and maxOffsetsPerTrigger options to determine ReadLimit.
getDefaultReadLimit uses maxTriggerDelayMs option, too, but it has a default value so it is always available.
| ReadLimit | Condition |
|---|---|
| CompositeReadLimit | Both minOffsetsPerTrigger and maxOffsetsPerTrigger defined |
| ReadMinRows | Only minOffsetPerTrigger defined |
| ReadMaxRows | Only maxOffsetsPerTrigger defined |
| ReadAllAvailable |
In other words, with minOffsetsPerTrigger and maxOffsetsPerTrigger defined, getDefaultReadLimit creates a CompositeReadLimit with the following:
- ReadMinRows with minOffsetsPerTrigger (and maxTriggerDelayMs)
- ReadMaxRows with maxOffsetsPerTrigger
With only minOffsetPerTrigger defined (with no maxOffsetsPerTrigger), getDefaultReadLimit creates a ReadMinRows with minOffsetsPerTrigger (and maxTriggerDelayMs).
Otherwise, getDefaultReadLimit takes the maxOffsetsPerTrigger, if defined, and creates a ReadMaxRows (with the approximate maximum rows to scan) or defaults to ReadAllAvailable.
maxOffsetsPerTrigger¶
maxOffsetsPerTrigger: Option[Long]
KafkaMicroBatchStream takes the value of maxOffsetsPerTrigger option (in the options), if available, when created. Otherwise, maxOffsetsPerTrigger is None (undefined).
minOffsetPerTrigger¶
minOffsetPerTrigger: Option[Long]
When created, KafkaMicroBatchStream takes the value of minOffsetsPerTrigger option (in the options), if available, or defaults to None (undefined).
minOffsetPerTrigger is used to determine the default limit on the number of records to read.
maxTriggerDelayMs¶
KafkaMicroBatchStream reads the value of maxTriggerDelay option (in the options) when created.
Latest Offset¶
latestOffset(
start: Offset,
readLimit: ReadLimit): Offset
latestOffset is part of the SupportsAdmissionControl abstraction.
latestOffset converts the given start offset to a KafkaSourceOffset to request for the partitionToOffsets.
latestOffset sets the latestPartitionOffsets internal registry to be as follows:
- allDataForTriggerAvailableNow, if available
- fetchLatestOffsets of the KafkaOffsetReader (for the partitionToOffsets of the given KafkaSourceOffset), otherwise
FIXME
When is allDataForTriggerAvailableNow available?
latestOffset requests the given ReadLimit for read limits if it is a CompositeReadLimit. Otherwise, latestOffset uses the given ReadLimit as the only read limit.
latestOffset determines the offsets to read based on the read limits.
-
With ReadAllAvailable among the read limits,
latestOffsetuses the latestPartitionOffsets registry.ReadAllAvailablehas the highest priority as it is necessary forTrigger.Onceto work properly. -
With ReadMinRows among the read limits,
latestOffsetchecks whether to skip this trigger or not (using theminRowsandmaxTriggerDelayMsof thisReadMinRowsas well as the latestPartitionOffsets and the partitionToOffsets of the given KafkaSourceOffset).If there is not enough rows available (based on
minRows) ormaxTriggerDelayMshas not elapsed yet,latestOffsetprints out the following DEBUG message to the logs:Delaying batch as number of records available is less than minOffsetsPerTrigger -
With ReadMaxRows among the read limits,
latestOffsetrateLimit (with themaxRowsas well as the latestPartitionOffsets and the partitionToOffsets of the given KafkaSourceOffset). -
With neither ReadMinRows nor ReadMaxRows among the read limits,
latestOffsetuses the latestPartitionOffsets registry (as if ReadAllAvailable were among the read limits).
In the end, latestOffset records the offsets in the endPartitionOffsets registry.
Summary
endPartitionOffsets can be as follows based on the read limits:
- latestPartitionOffsets for
ReadAllAvailable - partitionToOffsets of the given KafkaSourceOffset for
ReadMinRowsand a batch delayed - rateLimit for
ReadMaxRows - latestPartitionOffsets
delayBatch¶
delayBatch(
minLimit: Long,
latestOffsets: Map[TopicPartition, Long],
currentOffsets: Map[TopicPartition, Long],
maxTriggerDelayMs: Long): Boolean
Summary
delayBatch is false (no delay) when either holds:
- The given
maxTriggerDelayMshas passed since lastTriggerMillis - The total of new records (across all topics and partitions given
latestOffsetsandcurrentOffsets) is at least the givenminLimit
Otherwise, delayBatch is true (delay).
| Input Parameter | Value |
|---|---|
minLimit | minOffsetPerTrigger |
latestOffsets | latestPartitionOffsets |
currentOffsets | Offsets by Partitions of the start offset (as given to latestOffset) |
maxTriggerDelayMs | maxTriggerDelayMs |
If the given maxTriggerDelayMs has passed (since lastTriggerMillis), delayBatch prints out the following DEBUG message to the logs, records the current timestamp in lastTriggerMillis registry and returns false (no delay).
Maximum wait time is passed, triggering batch
Otherwise, delayBatch calculates the number of new records (based on the given latestOffsets and currentOffsets).
If the number of new records is below the given minLimit, delayBatch returns true (delay). Otherwise, delayBatch records the current timestamp in lastTriggerMillis registry and returns false (no delay).
reportDataLoss¶
reportDataLoss(message: String): Unit
With failOnDataLoss enabled, reportDataLoss throws an IllegalStateException (with the given message):
[message]. Some data may have been lost because they are not available in Kafka any more;
either the data was aged out by Kafka or the topic may have been deleted before all the data in the topic was processed.
If you don't want your streaming query to fail on such cases, set the source option "failOnDataLoss" to "false"
Otherwise, reportDataLoss prints out the following WARN message (with the given message) to the logs:
[message]. Some data may have been lost because they are not available in Kafka any more;
either the data was aged out by Kafka or the topic may have been deleted before all the data in the
topic was processed.
If you want your streaming query to fail on such cases, set the source option "failOnDataLoss" to "true"
reportDataLoss is used when:
KafkaMicroBatchStreamis requested to planInputPartitions and getOrCreateInitialPartitionOffsets
String Representation¶
toString(): String
toString is part of the Object (Java) abstraction.
toString is the following (with the string representation of the KafkaOffsetReader):
KafkaV2[[kafkaOffsetReader]]
Logging¶
Enable ALL logging level for org.apache.spark.sql.kafka010.KafkaMicroBatchStream logger to see what happens inside.
Add the following line to conf/log4j.properties:
log4j.logger.org.apache.spark.sql.kafka010.KafkaMicroBatchStream=ALL
Refer to Logging.