KafkaMicroBatchStream¶
KafkaMicroBatchStream
is a MicroBatchStream for Kafka Data Source for Micro-Batch Stream Processing.
Creating Instance¶
KafkaMicroBatchStream
takes the following to be created:
- KafkaOffsetReader
- Kafka Params For Executors
- Options
- Metadata path
- Starting KafkaOffsetRangeLimit
- failOnDataLoss
KafkaMicroBatchStream
is created when:
KafkaScan
is requested for a MicroBatchStream
Kafka Params For Executors¶
KafkaMicroBatchStream
is given Kafka params to use on executors when created.
The Kafka params are the kafkaParamsForExecutors based on the options of the KafkaScan (this KafkaMicroBatchStream
is created for) that have been converted (kafka
-prefix removed).
failOnDataLoss¶
failOnDataLoss: Boolean
KafkaMicroBatchStream
is given failOnDataLoss
flag when created.
failOnDataLoss
is the value of failOnDataLoss option.
failOnDataLoss
flag is used for the following:
SupportsTriggerAvailableNow¶
KafkaMicroBatchStream
is a SupportsTriggerAvailableNow.
prepareForTriggerAvailableNow¶
prepareForTriggerAvailableNow(): Unit
prepareForTriggerAvailableNow
is part of the SupportsTriggerAvailableNow abstraction.
prepareForTriggerAvailableNow
sets the allDataForTriggerAvailableNow internal registry to fetchLatestOffsets (of the KafkaOffsetReader) for getOrCreateInitialPartitionOffsets.
ReportsSourceMetrics¶
KafkaMicroBatchStream
is a ReportsSourceMetrics.
Performance Metrics¶
metrics(
latestConsumedOffset: Optional[Offset]): Map[String, String]
metrics
is part of the ReportsSourceMetrics abstraction.
metrics
returns the metrics for the given latestConsumedOffset
and the latestPartitionOffsets.
metrics¶
metrics(
latestConsumedOffset: Optional[Offset],
latestAvailablePartitionOffsets: Map[TopicPartition, Long]): Map[String, String]
metrics
converts the given latestConsumedOffset
to a KafkaSourceOffset when defined.
metrics
returns the following performance metrics for the offsets in the given latestAvailablePartitionOffsets
behind the latest partitionToOffsets (of the given latestConsumedOffset
):
minOffsetsBehindLatest
maxOffsetsBehindLatest
avgOffsetsBehindLatest
Default Read Limit¶
getDefaultReadLimit: ReadLimit
getDefaultReadLimit
is part of the SupportsAdmissionControl abstraction.
getDefaultReadLimit
uses minOffsetsPerTrigger and maxOffsetsPerTrigger options to determine ReadLimit.
getDefaultReadLimit
uses maxTriggerDelayMs option, too, but it has a default value so it is always available.
ReadLimit | Condition |
---|---|
CompositeReadLimit | Both minOffsetsPerTrigger and maxOffsetsPerTrigger defined |
ReadMinRows | Only minOffsetPerTrigger defined |
ReadMaxRows | Only maxOffsetsPerTrigger defined |
ReadAllAvailable |
In other words, with minOffsetsPerTrigger and maxOffsetsPerTrigger defined, getDefaultReadLimit
creates a CompositeReadLimit with the following:
- ReadMinRows with minOffsetsPerTrigger (and maxTriggerDelayMs)
- ReadMaxRows with maxOffsetsPerTrigger
With only minOffsetPerTrigger defined (with no maxOffsetsPerTrigger), getDefaultReadLimit
creates a ReadMinRows with minOffsetsPerTrigger (and maxTriggerDelayMs).
Otherwise, getDefaultReadLimit
takes the maxOffsetsPerTrigger, if defined, and creates a ReadMaxRows
(with the approximate maximum rows to scan) or defaults to ReadAllAvailable.
maxOffsetsPerTrigger¶
maxOffsetsPerTrigger: Option[Long]
KafkaMicroBatchStream
takes the value of maxOffsetsPerTrigger option (in the options), if available, when created. Otherwise, maxOffsetsPerTrigger
is None
(undefined).
minOffsetPerTrigger¶
minOffsetPerTrigger: Option[Long]
When created, KafkaMicroBatchStream
takes the value of minOffsetsPerTrigger option (in the options), if available, or defaults to None
(undefined).
minOffsetPerTrigger
is used to determine the default limit on the number of records to read.
maxTriggerDelayMs¶
KafkaMicroBatchStream
reads the value of maxTriggerDelay option (in the options) when created.
Latest Offset¶
latestOffset(
start: Offset,
readLimit: ReadLimit): Offset
latestOffset
is part of the SupportsAdmissionControl abstraction.
latestOffset
converts the given start
offset to a KafkaSourceOffset to request for the partitionToOffsets.
latestOffset
sets the latestPartitionOffsets internal registry to be as follows:
- allDataForTriggerAvailableNow, if available
- fetchLatestOffsets of the KafkaOffsetReader (for the partitionToOffsets of the given KafkaSourceOffset), otherwise
FIXME
When is allDataForTriggerAvailableNow available?
latestOffset
requests the given ReadLimit for read limits if it is a CompositeReadLimit. Otherwise, latestOffset
uses the given ReadLimit as the only read limit.
latestOffset
determines the offsets to read based on the read limits.
-
With ReadAllAvailable among the read limits,
latestOffset
uses the latestPartitionOffsets registry.ReadAllAvailable
has the highest priority as it is necessary forTrigger.Once
to work properly. -
With ReadMinRows among the read limits,
latestOffset
checks whether to skip this trigger or not (using theminRows
andmaxTriggerDelayMs
of thisReadMinRows
as well as the latestPartitionOffsets and the partitionToOffsets of the given KafkaSourceOffset).If there is not enough rows available (based on
minRows
) ormaxTriggerDelayMs
has not elapsed yet,latestOffset
prints out the following DEBUG message to the logs:Delaying batch as number of records available is less than minOffsetsPerTrigger
-
With ReadMaxRows among the read limits,
latestOffset
rateLimit (with themaxRows
as well as the latestPartitionOffsets and the partitionToOffsets of the given KafkaSourceOffset). -
With neither ReadMinRows nor ReadMaxRows among the read limits,
latestOffset
uses the latestPartitionOffsets registry (as if ReadAllAvailable were among the read limits).
In the end, latestOffset
records the offsets in the endPartitionOffsets registry.
Summary
endPartitionOffsets can be as follows based on the read limits:
- latestPartitionOffsets for
ReadAllAvailable
- partitionToOffsets of the given KafkaSourceOffset for
ReadMinRows
and a batch delayed - rateLimit for
ReadMaxRows
- latestPartitionOffsets
delayBatch¶
delayBatch(
minLimit: Long,
latestOffsets: Map[TopicPartition, Long],
currentOffsets: Map[TopicPartition, Long],
maxTriggerDelayMs: Long): Boolean
Summary
delayBatch
is false
(no delay) when either holds:
- The given
maxTriggerDelayMs
has passed since lastTriggerMillis - The total of new records (across all topics and partitions given
latestOffsets
andcurrentOffsets
) is at least the givenminLimit
Otherwise, delayBatch
is true
(delay).
Input Parameter | Value |
---|---|
minLimit | minOffsetPerTrigger |
latestOffsets | latestPartitionOffsets |
currentOffsets | Offsets by Partitions of the start offset (as given to latestOffset) |
maxTriggerDelayMs | maxTriggerDelayMs |
If the given maxTriggerDelayMs
has passed (since lastTriggerMillis), delayBatch
prints out the following DEBUG message to the logs, records the current timestamp in lastTriggerMillis registry and returns false
(no delay).
Maximum wait time is passed, triggering batch
Otherwise, delayBatch
calculates the number of new records (based on the given latestOffsets
and currentOffsets
).
If the number of new records is below the given minLimit
, delayBatch
returns true
(delay). Otherwise, delayBatch
records the current timestamp in lastTriggerMillis registry and returns false
(no delay).
reportDataLoss¶
reportDataLoss(message: String): Unit
With failOnDataLoss enabled, reportDataLoss
throws an IllegalStateException
(with the given message
):
[message]. Some data may have been lost because they are not available in Kafka any more;
either the data was aged out by Kafka or the topic may have been deleted before all the data in the topic was processed.
If you don't want your streaming query to fail on such cases, set the source option "failOnDataLoss" to "false"
Otherwise, reportDataLoss
prints out the following WARN message (with the given message
) to the logs:
[message]. Some data may have been lost because they are not available in Kafka any more;
either the data was aged out by Kafka or the topic may have been deleted before all the data in the
topic was processed.
If you want your streaming query to fail on such cases, set the source option "failOnDataLoss" to "true"
reportDataLoss
is used when:
KafkaMicroBatchStream
is requested to planInputPartitions and getOrCreateInitialPartitionOffsets
String Representation¶
toString(): String
toString
is part of the Object
(Java) abstraction.
toString
is the following (with the string representation of the KafkaOffsetReader):
KafkaV2[[kafkaOffsetReader]]
Logging¶
Enable ALL
logging level for org.apache.spark.sql.kafka010.KafkaMicroBatchStream
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.sql.kafka010.KafkaMicroBatchStream=ALL
Refer to Logging.