KafkaSourceProvider¶
KafkaSourceProvider
is the entry point to the kafka data source.
KafkaSourceProvider
is a SimpleTableProvider (and does not support custom table schema and partitioning).
Note
KafkaSourceProvider
is also a StreamSourceProvider
and a StreamSinkProvider
to be used in Spark Structured Streaming.
Learn more on StreamSourceProvider and StreamSinkProvider in The Internals of Spark Structured Streaming online book.
DataSourceRegister¶
KafkaSourceProvider
is a DataSourceRegister and registers itself as kafka format.
KafkaSourceProvider
uses META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
file for the registration (available in the source code of Apache Spark).
KafkaTable¶
getTable(
options: CaseInsensitiveStringMap): KafkaTable
getTable
is part of the SimpleTableProvider abstraction.
getTable
creates a KafkaTable with the includeHeaders
flag based on includeHeaders option.
Creating Relation for Reading (RelationProvider)¶
createRelation(
sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation
createRelation
is part of the RelationProvider abstraction.
createRelation
starts by <parameters
.
createRelation
collects all kafka.
-prefixed key options (in the input parameters
) and creates a local specifiedKafkaParams
with the keys without the kafka.
prefix (e.g. kafka.whatever
is simply whatever
).
createRelation
<startingoffsets
offset option key (in the given parameters
) and EarliestOffsetRangeLimit as the default offsets.
createRelation
makes sure that the KafkaOffsetRangeLimit is not EarliestOffsetRangeLimit or throws an AssertionError
.
createRelation
<endingoffsets
offset option key (in the given parameters
) and LatestOffsetRangeLimit as the default offsets.
createRelation
makes sure that the KafkaOffsetRangeLimit is not EarliestOffsetRangeLimit or throws a AssertionError
.
In the end, createRelation
creates a KafkaRelation with the <parameters
), <
Creating Relation for Writing (CreatableRelationProvider)¶
createRelation(
sqlContext: SQLContext,
mode: SaveMode,
parameters: Map[String, String],
df: DataFrame): BaseRelation
createRelation
is part of the CreatableRelationProvider abstraction.
createRelation
gets the topic option from the input parameters
.
createRelation
gets the <parameters
.
createRelation
then uses the KafkaWriter
helper object to write the rows of the DataFrame to the Kafka topic.
In the end, createRelation
creates a fake BaseRelation that simply throws an UnsupportedOperationException
for all its methods.
createRelation
supports Append and ErrorIfExists only. createRelation
throws an AnalysisException
for the other save modes:
Save mode [mode] not allowed for Kafka. Allowed save modes are [Append] and [ErrorIfExists] (default).
Kafka Configuration Properties for Driver¶
kafkaParamsForDriver(
specifiedKafkaParams: Map[String, String]): ju.Map[String, Object]
kafkaParamsForDriver
is a utility to define required Kafka configuration parameters for the driver.
kafkaParamsForDriver
is used when:
KafkaBatch
is requested to planInputPartitionsKafkaRelation
is requested to buildScanKafkaSourceProvider
tocreateSource
(for Spark Structured Streaming)KafkaScan
is requested totoMicroBatchStream
andtoContinuousStream
(for Spark Structured Streaming)
auto.offset.reset¶
What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):
-
earliest
- automatically reset the offset to the earliest offset -
latest
- automatically reset the offset to the latest offset -
none
- throw an exception to the Kafka consumer if no previous offset is found for the consumer's group -
anything else - throw an exception to the Kafka consumer
Value: earliest
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
enable.auto.commit¶
If true
the Kafka consumer's offset will be periodically committed in the background
Value: false
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
key.deserializer¶
Deserializer class for keys that implements the Kafka Deserializer
interface.
Value: org.apache.kafka.common.deserialization.ByteArrayDeserializer
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG
max.poll.records¶
The maximum number of records returned in a single call to Consumer.poll()
Value: 1
ConsumerConfig.MAX_POLL_RECORDS_CONFIG
receive.buffer.bytes¶
Only set if not set already
Value: 65536
ConsumerConfig.MAX_POLL_RECORDS_CONFIG
value.deserializer¶
Deserializer class for values that implements the Kafka Deserializer
interface.
Value: org.apache.kafka.common.serialization.ByteArrayDeserializer
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG
Tip
Enable ALL
logging level for org.apache.spark.sql.kafka010.KafkaSourceProvider.ConfigUpdater
logger to see updates of Kafka configuration parameters.
Add the following line to conf/log4j2.properties
:
log4j.logger.org.apache.spark.sql.kafka010.KafkaSourceProvider.ConfigUpdater=ALL
Refer to Logging.
kafkaParamsForExecutors¶
kafkaParamsForExecutors(
specifiedKafkaParams: Map[String, String],
uniqueGroupId: String): ju.Map[String, Object]
kafkaParamsForExecutors
...FIXME
kafkaParamsForExecutors
is used when:
KafkaBatch
is requested to planInputPartitionsKafkaRelation
is requested to buildScanKafkaSourceProvider
tocreateSource
(for Spark Structured Streaming)KafkaScan
is requested totoMicroBatchStream
andtoContinuousStream
(for Spark Structured Streaming)
kafkaParamsForProducer¶
kafkaParamsForProducer(
params: CaseInsensitiveMap[String]): ju.Map[String, Object]
kafkaParamsForProducer
...FIXME
kafkaParamsForProducer
is used when:
- KafkaSourceProvider is requested to create a relation for writing (and
createSink
for Spark Structured Streaming) KafkaTable
is requested for a WriteBuilder
Validating Kafka Options for Batch Queries¶
validateBatchOptions(
params: CaseInsensitiveMap[String]): Unit
validateBatchOptions
<caseInsensitiveParams
and with EarliestOffsetRangeLimit as the default KafkaOffsetRangeLimit
.
validateBatchOptions
then matches the returned KafkaOffsetRangeLimit as follows:
-
EarliestOffsetRangeLimit is acceptable and
validateBatchOptions
simply does nothing -
LatestOffsetRangeLimit is not acceptable and
validateBatchOptions
throws anIllegalArgumentException
:starting offset can't be latest for batch queries on Kafka
-
SpecificOffsetRangeLimit is acceptable unless one of the offsets is -1L for which
validateBatchOptions
throws anIllegalArgumentException
:startingOffsets for [tp] can't be latest for batch queries on Kafka
validateBatchOptions
is used when:
KafkaSourceProvider
is requested for a relation for readingKafkaScan
is requested for a Batch
Getting Desired KafkaOffsetRangeLimit (for Offset Option)¶
getKafkaOffsetRangeLimit(
params: Map[String, String],
offsetOptionKey: String,
defaultOffsets: KafkaOffsetRangeLimit): KafkaOffsetRangeLimit
getKafkaOffsetRangeLimit
tries to find the given offsetOptionKey
in the input params
and converts the value found to a KafkaOffsetRangeLimit as follows:
-
latest
becomes LatestOffsetRangeLimit -
earliest
becomes EarliestOffsetRangeLimit -
For a JSON text,
getKafkaOffsetRangeLimit
uses theJsonUtils
helper object to read per-TopicPartition offsets from it and creates a SpecificOffsetRangeLimit
When the input offsetOptionKey
was not found, getKafkaOffsetRangeLimit
returns the input defaultOffsets
.
getKafkaOffsetRangeLimit
is used when:
KafkaSourceProvider
is requested for a relation for reading andcreateSource
(Spark Structured Streaming)KafkaScan
is requested for a Batch,toMicroBatchStream
(Spark Structured Streaming) andtoContinuousStream
(Spark Structured Streaming)
ConsumerStrategy¶
strategy(
params: CaseInsensitiveMap[String]): ConsumerStrategy
strategy
finds one of the strategy options: subscribe, subscribepattern and assign.
For assign, strategy
uses the JsonUtils
helper object to deserialize TopicPartitions from JSON (e.g. {"topicA":[0,1],"topicB":[0,1]}
) and returns a new AssignStrategy.
For subscribe, strategy
splits the value by ,
(comma) and returns a new SubscribeStrategy.
For subscribepattern, strategy
returns a new SubscribePatternStrategy
strategy
is used when:
KafkaSourceProvider
is requested for a relation for reading andcreateSource
(Spark Structured Streaming)KafkaScan
is requested for a Batch,toMicroBatchStream
(Spark Structured Streaming) andtoContinuousStream
(Spark Structured Streaming)
failOnDataLoss Option¶
failOnDataLoss(
params: CaseInsensitiveMap[String]): Boolean
failOnDataLoss
is the value of failOnDataLoss
key in the given case-insensitive parameters (options) if available or true
.
KafkaSourceProvider
is requested for a relation for reading (andcreateSource
for Spark Structured Streaming)KafkaScan
is requested for a Batch (andtoMicroBatchStream
andtoContinuousStream
for Spark Structured Streaming)