KafkaOffsetReader¶
KafkaOffsetReader
is an abstraction of Kafka offset readers.
Contract (Subset)¶
fetchEarliestOffsets¶
fetchEarliestOffsets(): Map[TopicPartition, Long]
fetchEarliestOffsets(
newPartitions: Seq[TopicPartition]): Map[TopicPartition, Long]
Used when:
KafkaContinuousStream
is requested to initialOffset, planInputPartitionsKafkaMicroBatchStream
is requested to getOrCreateInitialPartitionOffsets, rateLimitKafkaSource
is requested for initialPartitionOffsets, rateLimit
fetchGlobalTimestampBasedOffsets¶
fetchGlobalTimestampBasedOffsets(
timestamp: Long,
isStartingOffsets: Boolean,
strategyOnNoMatchingStartingOffset: StrategyOnNoMatchStartingOffset.Value): KafkaSourceOffset
Used when:
KafkaContinuousStream
is requested to initialOffsetKafkaMicroBatchStream
is requested to getOrCreateInitialPartitionOffsetsKafkaSource
is requested for initialPartitionOffsets
fetchLatestOffsets¶
fetchLatestOffsets(
knownOffsets: Option[Map[TopicPartition, Long]]): Map[TopicPartition, Long]
Used when:
KafkaContinuousStream
is requested to initialOffset, needsReconfigurationKafkaMicroBatchStream
is requested to latestOffset, getOrCreateInitialPartitionOffsets, prepareForTriggerAvailableNowKafkaSource
is requested for initialPartitionOffsets, latestOffset, prepareForTriggerAvailableNow
fetchPartitionOffsets¶
fetchPartitionOffsets(
offsetRangeLimit: KafkaOffsetRangeLimit,
isStartingOffsets: Boolean): Map[TopicPartition, Long]
Used when:
- never used?!
Implementations¶
Creating KafkaOffsetReader¶
build(
consumerStrategy: ConsumerStrategy,
driverKafkaParams: ju.Map[String, Object],
readerOptions: CaseInsensitiveMap[String],
driverGroupIdPrefix: String): KafkaOffsetReader
build
branches off based on spark.sql.streaming.kafka.useDeprecatedOffsetFetching configuration property:
-
When enabled,
build
prints out the following DEBUG message to the logs and creates a KafkaOffsetReaderConsumer.Creating old and deprecated Consumer based offset reader
-
Otherwise,
build
prints out the following DEBUG message to the logs and creates a KafkaOffsetReaderAdmin.Creating new Admin based offset reader
build
is used when:
KafkaBatch
is requested to planInputPartitionsKafkaRelation
is requested to buildScanKafkaSourceProvider
is requested to create a SourceKafkaScan
is requested to create a MicroBatchStream or a ContinuousStream
Logging¶
Enable ALL
logging level for org.apache.spark.sql.kafka010.KafkaOffsetReader
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.sql.kafka010.KafkaOffsetReader=ALL
Refer to Logging.