KafkaSourceProvider¶
KafkaSourceProvider is the entry point to the kafka data source.
KafkaSourceProvider is a SimpleTableProvider (and does not support custom table schema and partitioning).
Note
KafkaSourceProvider is also a StreamSourceProvider and a StreamSinkProvider to be used in Spark Structured Streaming.
Learn more on StreamSourceProvider and StreamSinkProvider in The Internals of Spark Structured Streaming online book.
DataSourceRegister¶
KafkaSourceProvider is a DataSourceRegister and registers itself as kafka format.
KafkaSourceProvider uses META-INF/services/org.apache.spark.sql.sources.DataSourceRegister file for the registration (available in the source code of Apache Spark).
KafkaTable¶
getTable(
options: CaseInsensitiveStringMap): KafkaTable
getTable is part of the SimpleTableProvider abstraction.
getTable creates a KafkaTable with the includeHeaders flag based on includeHeaders option.
Creating Relation for Reading (RelationProvider)¶
createRelation(
sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation
createRelation is part of the RelationProvider abstraction.
createRelation starts by <parameters.
createRelation collects all kafka.-prefixed key options (in the input parameters) and creates a local specifiedKafkaParams with the keys without the kafka. prefix (e.g. kafka.whatever is simply whatever).
createRelation <startingoffsets offset option key (in the given parameters) and EarliestOffsetRangeLimit as the default offsets.
createRelation makes sure that the KafkaOffsetRangeLimit is not EarliestOffsetRangeLimit or throws an AssertionError.
createRelation <endingoffsets offset option key (in the given parameters) and LatestOffsetRangeLimit as the default offsets.
createRelation makes sure that the KafkaOffsetRangeLimit is not EarliestOffsetRangeLimit or throws a AssertionError.
In the end, createRelation creates a KafkaRelation with the <parameters), <
Creating Relation for Writing (CreatableRelationProvider)¶
createRelation(
sqlContext: SQLContext,
mode: SaveMode,
parameters: Map[String, String],
df: DataFrame): BaseRelation
createRelation is part of the CreatableRelationProvider abstraction.
createRelation gets the topic option from the input parameters.
createRelation gets the <parameters.
createRelation then uses the KafkaWriter helper object to write the rows of the DataFrame to the Kafka topic.
In the end, createRelation creates a fake BaseRelation that simply throws an UnsupportedOperationException for all its methods.
createRelation supports Append and ErrorIfExists only. createRelation throws an AnalysisException for the other save modes:
Save mode [mode] not allowed for Kafka. Allowed save modes are [Append] and [ErrorIfExists] (default).
Kafka Configuration Properties for Driver¶
kafkaParamsForDriver(
specifiedKafkaParams: Map[String, String]): ju.Map[String, Object]
kafkaParamsForDriver is a utility to define required Kafka configuration parameters for the driver.
kafkaParamsForDriver is used when:
KafkaBatchis requested to planInputPartitionsKafkaRelationis requested to buildScanKafkaSourceProvidertocreateSource(for Spark Structured Streaming)KafkaScanis requested totoMicroBatchStreamandtoContinuousStream(for Spark Structured Streaming)
auto.offset.reset¶
What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):
-
earliest- automatically reset the offset to the earliest offset -
latest- automatically reset the offset to the latest offset -
none- throw an exception to the Kafka consumer if no previous offset is found for the consumer's group -
anything else - throw an exception to the Kafka consumer
Value: earliest
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
enable.auto.commit¶
If true the Kafka consumer's offset will be periodically committed in the background
Value: false
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
key.deserializer¶
Deserializer class for keys that implements the Kafka Deserializer interface.
Value: org.apache.kafka.common.deserialization.ByteArrayDeserializer
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG
max.poll.records¶
The maximum number of records returned in a single call to Consumer.poll()
Value: 1
ConsumerConfig.MAX_POLL_RECORDS_CONFIG
receive.buffer.bytes¶
Only set if not set already
Value: 65536
ConsumerConfig.MAX_POLL_RECORDS_CONFIG
value.deserializer¶
Deserializer class for values that implements the Kafka Deserializer interface.
Value: org.apache.kafka.common.serialization.ByteArrayDeserializer
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG
Tip
Enable ALL logging level for org.apache.spark.sql.kafka010.KafkaSourceProvider.ConfigUpdater logger to see updates of Kafka configuration parameters.
Add the following line to conf/log4j2.properties:
log4j.logger.org.apache.spark.sql.kafka010.KafkaSourceProvider.ConfigUpdater=ALL
Refer to Logging.
kafkaParamsForExecutors¶
kafkaParamsForExecutors(
specifiedKafkaParams: Map[String, String],
uniqueGroupId: String): ju.Map[String, Object]
kafkaParamsForExecutors...FIXME
kafkaParamsForExecutors is used when:
KafkaBatchis requested to planInputPartitionsKafkaRelationis requested to buildScanKafkaSourceProvidertocreateSource(for Spark Structured Streaming)KafkaScanis requested totoMicroBatchStreamandtoContinuousStream(for Spark Structured Streaming)
kafkaParamsForProducer¶
kafkaParamsForProducer(
params: CaseInsensitiveMap[String]): ju.Map[String, Object]
kafkaParamsForProducer...FIXME
kafkaParamsForProducer is used when:
- KafkaSourceProvider is requested to create a relation for writing (and
createSinkfor Spark Structured Streaming) KafkaTableis requested for a WriteBuilder
Validating Kafka Options for Batch Queries¶
validateBatchOptions(
params: CaseInsensitiveMap[String]): Unit
validateBatchOptions <caseInsensitiveParams and with EarliestOffsetRangeLimit as the default KafkaOffsetRangeLimit.
validateBatchOptions then matches the returned KafkaOffsetRangeLimit as follows:
-
EarliestOffsetRangeLimit is acceptable and
validateBatchOptionssimply does nothing -
LatestOffsetRangeLimit is not acceptable and
validateBatchOptionsthrows anIllegalArgumentException:starting offset can't be latest for batch queries on Kafka -
SpecificOffsetRangeLimit is acceptable unless one of the offsets is -1L for which
validateBatchOptionsthrows anIllegalArgumentException:startingOffsets for [tp] can't be latest for batch queries on Kafka
validateBatchOptions is used when:
KafkaSourceProvideris requested for a relation for readingKafkaScanis requested for a Batch
Getting Desired KafkaOffsetRangeLimit (for Offset Option)¶
getKafkaOffsetRangeLimit(
params: Map[String, String],
offsetOptionKey: String,
defaultOffsets: KafkaOffsetRangeLimit): KafkaOffsetRangeLimit
getKafkaOffsetRangeLimit tries to find the given offsetOptionKey in the input params and converts the value found to a KafkaOffsetRangeLimit as follows:
-
latestbecomes LatestOffsetRangeLimit -
earliestbecomes EarliestOffsetRangeLimit -
For a JSON text,
getKafkaOffsetRangeLimituses theJsonUtilshelper object to read per-TopicPartition offsets from it and creates a SpecificOffsetRangeLimit
When the input offsetOptionKey was not found, getKafkaOffsetRangeLimit returns the input defaultOffsets.
getKafkaOffsetRangeLimit is used when:
KafkaSourceProvideris requested for a relation for reading andcreateSource(Spark Structured Streaming)KafkaScanis requested for a Batch,toMicroBatchStream(Spark Structured Streaming) andtoContinuousStream(Spark Structured Streaming)
ConsumerStrategy¶
strategy(
params: CaseInsensitiveMap[String]): ConsumerStrategy
strategy finds one of the strategy options: subscribe, subscribepattern and assign.
For assign, strategy uses the JsonUtils helper object to deserialize TopicPartitions from JSON (e.g. {"topicA":[0,1],"topicB":[0,1]}) and returns a new AssignStrategy.
For subscribe, strategy splits the value by , (comma) and returns a new SubscribeStrategy.
For subscribepattern, strategy returns a new SubscribePatternStrategy
strategy is used when:
KafkaSourceProvideris requested for a relation for reading andcreateSource(Spark Structured Streaming)KafkaScanis requested for a Batch,toMicroBatchStream(Spark Structured Streaming) andtoContinuousStream(Spark Structured Streaming)
failOnDataLoss Option¶
failOnDataLoss(
params: CaseInsensitiveMap[String]): Boolean
failOnDataLoss is the value of failOnDataLoss key in the given case-insensitive parameters (options) if available or true.
KafkaSourceProvideris requested for a relation for reading (andcreateSourcefor Spark Structured Streaming)KafkaScanis requested for a Batch (andtoMicroBatchStreamandtoContinuousStreamfor Spark Structured Streaming)