Skip to content


KafkaOffsetReader is used to query a Kafka cluster for partition offsets.

KafkaOffsetReader is <> when:

[[toString]] When requested for the human-readable text representation (aka toString), KafkaOffsetReader simply requests the <> for one.

[[options]] .KafkaOffsetReader's Options [cols="1,1,2",options="header",width="100%"] |=== | Name | Default Value | Description

| [[fetchOffset.numRetries]] fetchOffset.numRetries | 3 |

| [[fetchOffset.retryIntervalMs]] fetchOffset.retryIntervalMs | 1000 | How long to wait before retries. |===

[[internal-registries]] .KafkaOffsetReader's Internal Registries and Counters [cols="1,2",options="header",width="100%"] |=== | Name | Description

| consumer a| [[consumer]] Kafka's[Consumer] (with keys and values of Array[Byte] type)

<> when KafkaOffsetReader is <>.

Used when KafkaOffsetReader:

  • <>
  • <>
  • <>
  • <>
  • <>

| execContext | [[execContext]]

| groupId | [[groupId]]

| kafkaReaderThread | [[kafkaReaderThread]]

| maxOffsetFetchAttempts | [[maxOffsetFetchAttempts]]

| nextId | [[nextId]]

| offsetFetchAttemptIntervalMs | [[offsetFetchAttemptIntervalMs]] |===


Enable INFO or DEBUG logging levels for org.apache.spark.sql.kafka010.KafkaOffsetReader to see what happens inside.

Add the following line to conf/

Refer to[Logging].

=== [[createConsumer]] Creating Kafka Consumer -- createConsumer Internal Method

[source, scala]

createConsumer(): Consumer[Array[Byte], Array[Byte]]

createConsumer requests the <> to create a Kafka Consumer with <> and <>.

createConsumer is used when KafkaOffsetReader is <> (and initializes <>) and <>

Creating Instance

KafkaOffsetReader takes the following to be created:

  • [[consumerStrategy]] ConsumerStrategy
  • [[driverKafkaParams]] Kafka parameters (as Map[String, Object])
  • [[readerOptions]] Reader options (as Map[String, String])
  • [[driverGroupIdPrefix]] Prefix for the group id

=== [[close]] close Method

[source, scala]

close(): Unit


NOTE: close is used when...FIXME

=== [[fetchEarliestOffsets]] fetchEarliestOffsets Method

[source, scala]

fetchEarliestOffsets(): Map[TopicPartition, Long]


NOTE: fetchEarliestOffsets is used when...FIXME

=== [[fetchEarliestOffsets-newPartitions]] fetchEarliestOffsets Method

[source, scala]

fetchEarliestOffsets(newPartitions: Seq[TopicPartition]): Map[TopicPartition, Long]


NOTE: fetchEarliestOffsets is used when...FIXME

=== [[fetchLatestOffsets]] fetchLatestOffsets Method

[source, scala]

fetchLatestOffsets(): Map[TopicPartition, Long]


NOTE: fetchLatestOffsets is used when...FIXME

=== [[fetchTopicPartitions]] Fetching (and Pausing) Assigned Kafka TopicPartitions -- fetchTopicPartitions Method

[source, scala]

fetchTopicPartitions(): Set[TopicPartition]

fetchTopicPartitions <> to do the following:

. Requests the <> to ++[poll] (fetch data) for the topics and partitions (with 0 timeout)

. Requests the <> to ++[get the set of partitions currently assigned]

. Requests the <> to ++[suspend fetching from the partitions assigned]

In the end, fetchTopicPartitions returns the TopicPartitions assigned (and paused).

fetchTopicPartitions is used when KafkaRelation is requested to build a distributed data scan with column pruning (as a TableScan) through getPartitionOffsets.

=== [[nextGroupId]] nextGroupId Internal Method

[source, scala]

nextGroupId(): String


NOTE: nextGroupId is used when...FIXME

=== [[resetConsumer]] resetConsumer Internal Method

[source, scala]

resetConsumer(): Unit


NOTE: resetConsumer is used when...FIXME

=== [[runUninterruptibly]] runUninterruptibly Internal Method

[source, scala]

runUninterruptiblyT: T


NOTE: runUninterruptibly is used when...FIXME

=== [[withRetriesWithoutInterrupt]] withRetriesWithoutInterrupt Internal Method

[source, scala]

withRetriesWithoutInterrupt(body: => Map[TopicPartition, Long]): Map[TopicPartition, Long]


NOTE: withRetriesWithoutInterrupt is used when...FIXME