Skip to content


KafkaOffsetReader relies on the ConsumerStrategy to <>.

KafkaOffsetReader <> with (ConsumerConfig.GROUP_ID_CONFIG) configuration explicitly set to <> (i.e. the given <> followed by <>).

KafkaOffsetReader is <> when:

[[options]] .KafkaOffsetReader's Options [cols="1m,3",options="header",width="100%"] |=== | Name | Description

| fetchOffset.numRetries a| [[fetchOffset.numRetries]]

Default: 3

| fetchOffset.retryIntervalMs a| [[fetchOffset.retryIntervalMs]] How long to wait before retries

Default: 1000


[[kafkaSchema]] KafkaOffsetReader defines the predefined fixed schema.

[[logging]] [TIP] ==== Enable ALL logging level for org.apache.spark.sql.kafka010.KafkaOffsetReader to see what happens inside.

Add the following line to conf/

Refer to <>.

=== [[creating-instance]] Creating KafkaOffsetReader Instance

KafkaOffsetReader takes the following to be created:

  • [[consumerStrategy]] ConsumerStrategy
  • [[driverKafkaParams]] Kafka parameters (as name-value pairs that are used exclusively to <>
  • [[readerOptions]] Options (as name-value pairs)
  • [[driverGroupIdPrefix]] Prefix of the group ID

KafkaOffsetReader initializes the <>.

=== [[nextGroupId]] nextGroupId Internal Method

[source, scala]

nextGroupId(): String

nextGroupId sets the <> to be the <>, - followed by the <> (i.e. [driverGroupIdPrefix]-[nextId]).

In the end, nextGroupId increments the <> and returns the <>.

NOTE: nextGroupId is used exclusively when KafkaOffsetReader is requested for a <>.

=== [[resetConsumer]] resetConsumer Internal Method

[source, scala]

resetConsumer(): Unit


NOTE: resetConsumer is used when...FIXME

=== [[fetchTopicPartitions]] fetchTopicPartitions Method

[source, scala]

fetchTopicPartitions(): Set[TopicPartition]


fetchTopicPartitions is used when KafkaRelation is requested for getPartitionOffsets.

=== [[fetchEarliestOffsets]] Fetching Earliest Offsets -- fetchEarliestOffsets Method

[source, scala]

fetchEarliestOffsets(): Map[TopicPartition, Long] fetchEarliestOffsets(newPartitions: Seq[TopicPartition]): Map[TopicPartition, Long]


NOTE: fetchEarliestOffsets is used when KafkaSource rateLimit and generates a DataFrame for a batch (when new partitions have been assigned).

=== [[fetchLatestOffsets]] Fetching Latest Offsets -- fetchLatestOffsets Method

[source, scala]

fetchLatestOffsets(): Map[TopicPartition, Long]


NOTE: fetchLatestOffsets is used when KafkaSource gets offsets or initialPartitionOffsets is initialized.

=== [[withRetriesWithoutInterrupt]] withRetriesWithoutInterrupt Internal Method

[source, scala]

withRetriesWithoutInterrupt( body: => Map[TopicPartition, Long]): Map[TopicPartition, Long]


NOTE: withRetriesWithoutInterrupt is used when...FIXME

=== [[fetchSpecificOffsets]] Fetching Offsets for Selected TopicPartitions -- fetchSpecificOffsets Method

[source, scala]

fetchSpecificOffsets( partitionOffsets: Map[TopicPartition, Long], reportDataLoss: String => Unit): KafkaSourceOffset

.KafkaOffsetReader's fetchSpecificOffsets image::images/KafkaOffsetReader-fetchSpecificOffsets.png[align="center"]

fetchSpecificOffsets requests the <> to poll(0).

fetchSpecificOffsets requests the <> for assigned partitions (using Consumer.assignment()).

fetchSpecificOffsets requests the <> to pause(partitions).

You should see the following DEBUG message in the logs:

DEBUG KafkaOffsetReader: Partitions assigned to consumer: [partitions]. Seeking to [partitionOffsets]

For every partition offset in the input partitionOffsets, fetchSpecificOffsets requests the <> to:

  • seekToEnd for the latest (aka -1)
  • seekToBeginning for the earliest (aka -2)
  • seek for other offsets

In the end, fetchSpecificOffsets creates a collection of Kafka's TopicPartition and position (using the <>).

fetchSpecificOffsets is used when KafkaSource fetches and verifies initial partition offsets.

=== [[createConsumer]] Creating Kafka Consumer -- createConsumer Internal Method

[source, scala]

createConsumer(): Consumer[Array[Byte], Array[Byte]]

createConsumer requests <> to create a Kafka Consumer with <> and <>.

NOTE: createConsumer is used when KafkaOffsetReader is <> (and initializes <>) and <>

=== [[consumer]] Creating Kafka Consumer (Unless Already Available) -- consumer Method

[source, scala]

consumer: Consumer[Array[Byte], Array[Byte]]

consumer gives the cached <<_consumer, Kafka Consumer>> or creates one itself.

NOTE: Since consumer method is used (to access the internal <<_consumer, Kafka Consumer>>) in the fetch methods that gives the property of creating a new Kafka Consumer whenever the internal <<_consumer, Kafka Consumer>> reference become null, i.e. as in <>.


NOTE: consumer is used when KafkaOffsetReader is requested to <>, <>, <>, and <>.

=== [[close]] Closing -- close Method

[source, scala]

close(): Unit

close <> (if the <<_consumer, Kafka Consumer>> is available).

close requests the <> to shut down.

close is used when:

=== [[runUninterruptibly]] runUninterruptibly Internal Method

[source, scala]

runUninterruptiblyT: T


NOTE: runUninterruptibly is used when...FIXME

=== [[stopConsumer]] stopConsumer Internal Method

[source, scala]

stopConsumer(): Unit


NOTE: stopConsumer is used when...FIXME

=== [[toString]] Textual Representation -- toString Method

[source, scala]

toString: String

NOTE: toString is part of the ++[java.lang.Object] contract for the string representation of the object.


=== [[internal-properties]] Internal Properties

[cols="30m,70",options="header",width="100%"] |=== | Name | Description

| _consumer a| [[_consumer]] Kafka's[Consumer] (Consumer[Array[Byte], Array[Byte]])

<> when KafkaOffsetReader is <>.

Used when KafkaOffsetReader:

  • <>
  • <>
  • <>
  • <>
  • <>
  • <>

| execContext a| [[execContext]][scala.concurrent.ExecutionContextExecutorService]

| groupId a| [[groupId]]

| kafkaReaderThread a| [[kafkaReaderThread]][java.util.concurrent.ExecutorService]

| maxOffsetFetchAttempts a| [[maxOffsetFetchAttempts]]

| nextId a| [[nextId]]

Initially 0

| offsetFetchAttemptIntervalMs a| [[offsetFetchAttemptIntervalMs]]