KafkaOffsetReader¶
KafkaOffsetReader is an abstraction of Kafka offset readers.
Contract (Subset)¶
fetchEarliestOffsets¶
fetchEarliestOffsets(): Map[TopicPartition, Long]
fetchEarliestOffsets(
newPartitions: Seq[TopicPartition]): Map[TopicPartition, Long]
Used when:
KafkaContinuousStreamis requested to initialOffset, planInputPartitionsKafkaMicroBatchStreamis requested to getOrCreateInitialPartitionOffsets, rateLimitKafkaSourceis requested for initialPartitionOffsets, rateLimit
fetchGlobalTimestampBasedOffsets¶
fetchGlobalTimestampBasedOffsets(
timestamp: Long,
isStartingOffsets: Boolean,
strategyOnNoMatchingStartingOffset: StrategyOnNoMatchStartingOffset.Value): KafkaSourceOffset
Used when:
KafkaContinuousStreamis requested to initialOffsetKafkaMicroBatchStreamis requested to getOrCreateInitialPartitionOffsetsKafkaSourceis requested for initialPartitionOffsets
fetchLatestOffsets¶
fetchLatestOffsets(
knownOffsets: Option[Map[TopicPartition, Long]]): Map[TopicPartition, Long]
Used when:
KafkaContinuousStreamis requested to initialOffset, needsReconfigurationKafkaMicroBatchStreamis requested to latestOffset, getOrCreateInitialPartitionOffsets, prepareForTriggerAvailableNowKafkaSourceis requested for initialPartitionOffsets, latestOffset, prepareForTriggerAvailableNow
fetchPartitionOffsets¶
fetchPartitionOffsets(
offsetRangeLimit: KafkaOffsetRangeLimit,
isStartingOffsets: Boolean): Map[TopicPartition, Long]
Used when:
- never used?!
Implementations¶
Creating KafkaOffsetReader¶
build(
consumerStrategy: ConsumerStrategy,
driverKafkaParams: ju.Map[String, Object],
readerOptions: CaseInsensitiveMap[String],
driverGroupIdPrefix: String): KafkaOffsetReader
build branches off based on spark.sql.streaming.kafka.useDeprecatedOffsetFetching configuration property:
-
When enabled,
buildprints out the following DEBUG message to the logs and creates a KafkaOffsetReaderConsumer.Creating old and deprecated Consumer based offset reader -
Otherwise,
buildprints out the following DEBUG message to the logs and creates a KafkaOffsetReaderAdmin.Creating new Admin based offset reader
build is used when:
KafkaBatchis requested to planInputPartitionsKafkaRelationis requested to buildScanKafkaSourceProvideris requested to create a SourceKafkaScanis requested to create a MicroBatchStream or a ContinuousStream
Logging¶
Enable ALL logging level for org.apache.spark.sql.kafka010.KafkaOffsetReader logger to see what happens inside.
Add the following line to conf/log4j.properties:
log4j.logger.org.apache.spark.sql.kafka010.KafkaOffsetReader=ALL
Refer to Logging.