Skip to content

KafkaOffsetReader

KafkaOffsetReader is an abstraction of Kafka offset readers.

Contract (Subset)

fetchEarliestOffsets

fetchEarliestOffsets(): Map[TopicPartition, Long]
fetchEarliestOffsets(
  newPartitions: Seq[TopicPartition]): Map[TopicPartition, Long]

Used when:

fetchGlobalTimestampBasedOffsets

fetchGlobalTimestampBasedOffsets(
  timestamp: Long,
  isStartingOffsets: Boolean,
  strategyOnNoMatchingStartingOffset: StrategyOnNoMatchStartingOffset.Value): KafkaSourceOffset

Used when:

fetchLatestOffsets

fetchLatestOffsets(
  knownOffsets: Option[Map[TopicPartition, Long]]): Map[TopicPartition, Long]

Used when:

fetchPartitionOffsets

fetchPartitionOffsets(
  offsetRangeLimit: KafkaOffsetRangeLimit,
  isStartingOffsets: Boolean): Map[TopicPartition, Long]

Used when:

  • never used?!

Implementations

Creating KafkaOffsetReader

build(
  consumerStrategy: ConsumerStrategy,
  driverKafkaParams: ju.Map[String, Object],
  readerOptions: CaseInsensitiveMap[String],
  driverGroupIdPrefix: String): KafkaOffsetReader

build branches off based on spark.sql.streaming.kafka.useDeprecatedOffsetFetching configuration property:

  • When enabled, build prints out the following DEBUG message to the logs and creates a KafkaOffsetReaderConsumer.

    Creating old and deprecated Consumer based offset reader
    
  • Otherwise, build prints out the following DEBUG message to the logs and creates a KafkaOffsetReaderAdmin.

    Creating new Admin based offset reader
    

build is used when:

Logging

Enable ALL logging level for org.apache.spark.sql.kafka010.KafkaOffsetReader logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.kafka010.KafkaOffsetReader=ALL

Refer to Logging.