Skip to content

Kafka Options

Options with kafka. prefix (e.g. kafka.bootstrap.servers) are considered configuration properties for the Kafka consumers used on the driver and executors.

Kafka options are defined as part of KafkaSourceProvider.

assign

Topic subscription strategy that accepts a JSON with topic names and partitions, e.g.

{"topicA":[0,1],"topicB":[0,1]}

Exactly one topic subscription strategy is allowed (that KafkaSourceProvider validates before creating KafkaSource).

endingOffsets

endingTimestamp

endingOffsetsByTimestamp

failOnDataLoss

Default: true

Used when:

fetchOffset.numRetries

fetchOffset.retryIntervalMs

groupIdPrefix

includeHeaders

Default: false

kafka.bootstrap.servers

(required) bootstrap.servers configuration property of the Kafka consumers used on the driver and executors

Default: (empty)

kafkaConsumer.pollTimeoutMs

The time (in milliseconds) spent waiting in Consumer.poll if data is not available in the buffer.

Default: spark.network.timeout or 120s

maxOffsetsPerTrigger

Number of records to fetch per trigger (to limit the number of records to fetch).

Default: (undefined)

Unless defined, KafkaSource requests KafkaOffsetReader for the latest offsets.

maxTriggerDelay

Default: 15m

For streaming queries only (ignored in batch queries)

Used when:

minOffsetsPerTrigger

Minimum number of records to read in a micro-batch

Default: (undefined)

For streaming queries only (ignored in batch queries)

Validated in validateGeneralOptions

minOffsetsPerTrigger vs minOffsetPerTrigger

The option's name is minOffsetsPerTrigger (with s) while in the code itself they refer to it by minOffsetPerTrigger (a singular offset).

Used when:

minPartitions

Minimum number of partitions per executor (given Kafka partitions)

Default: (undefined)

Must be undefined (default) or greater than 0

Review

When undefined (default) or smaller than the number of TopicPartitions with records to consume from, KafkaMicroBatchReader uses KafkaOffsetRangeCalculator to find the preferred executor for every TopicPartition (and the available executors).

startingOffsets

Starting offsets

Default: latest

Possible values:

  • latest

  • earliest

  • JSON with topics, partitions and their starting offsets, e.g.

    {"topicA":{"part":offset,"p1":-1},"topicB":{"0":-2}}
    

Tip

Use Scala's tripple quotes for the JSON for topics, partitions and offsets.

option(
  "startingOffsets",
  """{"topic1":{"0":5,"4":-1},"topic2":{"0":-2}}""")

startingOffsetsByTimestamp

startingTimestamp

subscribe

SubscribeStrategy to use that accepts topic names as a comma-separated string, e.g.

topic1,topic2,topic3

Exactly one topic subscription strategy is allowed (that KafkaSourceProvider validates before creating KafkaSource).

subscribePattern

Topic subscription strategy that uses Java's java.util.regex.Pattern for the topic subscription regex pattern of topics to subscribe to, e.g.

topic\d

Tip

Use Scala's tripple quotes for the regular expression for topic subscription regex pattern.

option("subscribepattern", """topic\d""")

Exactly one topic subscription strategy is allowed (that KafkaSourceProvider validates before creating KafkaSource).

topic

Optional topic name to use for writing the result of a streaming query to

Default: (empty)

Unless defined, Kafka data source uses the topic names as defined in the topic field in the dataset