KafkaSourceProvider¶
KafkaSourceProvider is the entry point (provider) to the built-in Kafka support in Spark Structured Streaming (and Spark SQL).
KafkaSourceProvider is a DataSourceRegister (Spark SQL) that registers itself under the kafka alias.
KafkaSourceProvider supports micro-batch stream processing (through MicroBatchStream) and uses a specialized KafkaMicroBatchReader.
Short Name (Alias)¶
shortName(): String
shortName is part of the DataSourceRegister abstraction.
shortName is kafka.
Kafka Consumer Properties on Executors¶
kafkaParamsForExecutors(
specifiedKafkaParams: Map[String, String],
uniqueGroupId: String): Map[String, Object]
kafkaParamsForExecutors sets the Kafka properties for the Kafka Consumers on executors.
kafkaParamsForExecutors creates a KafkaConfigUpdater for executor module (with the given specifiedKafkaParams).
kafkaParamsForExecutors sets (overrides) the following Kafka properties explicitly (in the KafkaConfigUpdater).
| ConsumerConfig's Key | Value | Note |
|---|---|---|
key.deserializer | ByteArrayDeserializer | |
value.deserializer | ByteArrayDeserializer | |
auto.offset.reset | none | |
group.id | [uniqueGroupId]-executor | setIfUnset |
enable.auto.commit | false | |
receive.buffer.bytes | 65536 | setIfUnset |
In the end, kafkaParamsForExecutors requests the KafkaConfigUpdater to build a Kafka configuration.
kafkaParamsForExecutors is used when:
KafkaSourceProvideris requested to createSource (for a KafkaSource)KafkaScanis requested to toMicroBatchStream (to create a KafkaMicroBatchStream), and toContinuousStream (for a KafkaContinuousStream)KafkaBatchis requested to planInputPartitions (for KafkaBatchInputPartitions)KafkaRelationis requested to buildScan (for a KafkaSourceRDD)
Unique Group ID for Batch Queries¶
batchUniqueGroupId(
params: CaseInsensitiveMap[String]): String
batchUniqueGroupId takes GROUP_ID_PREFIX, if specified, or defaults to spark-kafka-relation prefix to build the following group ID:
[groupIdPrefix]-[randomUUID]
batchUniqueGroupId is used when:
KafkaBatchis requested to planInputPartitionsKafkaRelationis requested to build a Scan
Unique Group ID for Streaming Queries¶
streamingUniqueGroupId(
params: CaseInsensitiveMap[String],
metadataPath: String): String
streamingUniqueGroupId takes GROUP_ID_PREFIX, if specified, or defaults to spark-kafka-source prefix to build the following group ID:
[groupIdPrefix]-[randomUUID]-[metadataPath.hashCode]
streamingUniqueGroupId is used when:
KafkaSourceProvideris requested to create a SourceKafkaScanis requested to toMicroBatchStream, toContinuousStream
Required Options¶
KafkaSourceProvider requires the following options (that you can set using option method of DataStreamReader or DataStreamWriter):
-
Exactly one of the following options: subscribe, subscribePattern or assign
Tip
Refer to Kafka Data Source's Options for the supported configuration options.
Creating KafkaTable¶
getTable(
options: CaseInsensitiveStringMap): KafkaTable
getTable creates a KafkaTable with the value of includeheaders option (default: false).
getTable is part of the SimpleTableProvider abstraction (Spark SQL).
Creating Streaming Sink¶
createSink(
sqlContext: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode): Sink
createSink creates a KafkaSink for topic option (if defined) and Kafka Producer parameters.
createSink is part of the StreamSinkProvider abstraction.
Creating Streaming Source¶
createSource(
sqlContext: SQLContext,
metadataPath: String,
schema: Option[StructType],
providerName: String,
parameters: Map[String, String]): Source
createSource is part of the StreamSourceProvider abstraction.
createSource validates stream options.
createSource...FIXME
Validating Options For Batch And Streaming Queries¶
validateGeneralOptions(
parameters: Map[String, String]): Unit
Note
Parameters are case-insensitive, i.e. OptioN and option are equal.
validateGeneralOptions makes sure that exactly one topic subscription strategy is used in parameters and can be:
subscribesubscribepatternassign
validateGeneralOptions reports an IllegalArgumentException when there is no subscription strategy in use or there are more than one strategies used.
validateGeneralOptions makes sure that the value of subscription strategies meet the requirements:
assignstrategy starts with{(the opening curly brace)subscribestrategy has at least one topic (in a comma-separated list of topics)subscribepatternstrategy has the pattern defined
validateGeneralOptions makes sure that group.id has not been specified and reports an IllegalArgumentException otherwise.
Kafka option 'group.id' is not supported as user-specified consumer groups are not used to track offsets.
validateGeneralOptions makes sure that auto.offset.reset has not been specified and reports an IllegalArgumentException otherwise.
Kafka option 'auto.offset.reset' is not supported.
Instead set the source option 'startingoffsets' to 'earliest' or 'latest' to specify where to start. Structured Streaming manages which offsets are consumed internally, rather than relying on the kafkaConsumer to do it. This will ensure that no data is missed when new topics/partitions are dynamically subscribed. Note that 'startingoffsets' only applies when a new Streaming query is started, and
that resuming will always pick up from where the query left off. See the docs for more details.
validateGeneralOptions makes sure that the following options have not been specified and reports an IllegalArgumentException otherwise:
kafka.key.deserializerkafka.value.deserializerkafka.enable.auto.commitkafka.interceptor.classes
In the end, validateGeneralOptions makes sure that kafka.bootstrap.servers option was specified and reports an IllegalArgumentException otherwise.
Option 'kafka.bootstrap.servers' must be specified for configuring Kafka consumer
validateGeneralOptions is used when KafkaSourceProvider validates options for streaming and batch queries.
Creating ConsumerStrategy¶
strategy(
caseInsensitiveParams: Map[String, String]): ConsumerStrategy
strategy converts a key (in caseInsensitiveParams) to a ConsumerStrategy.
| Key | ConsumerStrategy |
|---|---|
assign | AssignStrategy |
subscribe | SubscribeStrategy |
subscribepattern | SubscribePatternStrategy |
strategy is used when...FIXME
AssignStrategy¶
AssignStrategy with Kafka TopicPartitions
strategy uses JsonUtils.partitions method to parse a JSON with topic names and partitions, e.g.
{"topicA":[0,1],"topicB":[0,1]}
The topic names and partitions are mapped directly to Kafka's TopicPartition objects.
SubscribeStrategy¶
SubscribeStrategy with topic names
strategy extracts topic names from a comma-separated string, e.g.
topic1,topic2,topic3
SubscribePatternStrategy¶
SubscribePatternStrategy with topic subscription regex pattern (that uses a Java java.util.regex.Pattern for the pattern), e.g.
topic\d
Name and Schema of Streaming Source¶
sourceSchema(
sqlContext: SQLContext,
schema: Option[StructType],
providerName: String,
parameters: Map[String, String]): (String, StructType)
sourceSchema gives the short name (i.e. kafka) and the fixed schema.
Internally, sourceSchema validates Kafka options and makes sure that the optional input schema is indeed undefined.
When the input schema is defined, sourceSchema reports a IllegalArgumentException.
Kafka source has a fixed schema and cannot be set with a custom one
sourceSchema is part of the StreamSourceProvider abstraction.
Validating Kafka Options for Streaming Queries¶
validateStreamOptions(
caseInsensitiveParams: Map[String, String]): Unit
validateStreamOptions makes sure that endingoffsets option is not used. Otherwise, validateStreamOptions reports a IllegalArgumentException.
ending offset not valid in streaming queries
validateStreamOptions validates the general options.
validateStreamOptions is used when KafkaSourceProvider is requested for the schema for Kafka source and to create a KafkaSource.
Converting Configuration Options to KafkaOffsetRangeLimit¶
getKafkaOffsetRangeLimit(
params: Map[String, String],
offsetOptionKey: String,
defaultOffsets: KafkaOffsetRangeLimit): KafkaOffsetRangeLimit
getKafkaOffsetRangeLimit finds the given offsetOptionKey in the params and does the following conversion:
-
latest becomes LatestOffsetRangeLimit
-
earliest becomes EarliestOffsetRangeLimit
-
A JSON-formatted text becomes SpecificOffsetRangeLimit
-
When the given
offsetOptionKeyis not found,getKafkaOffsetRangeLimitreturns the givendefaultOffsets
getKafkaOffsetRangeLimit is used when:
KafkaSourceProvideris requested to createSource, createMicroBatchReader, createContinuousReader, createRelation, and validateBatchOptions
Creating Fake BaseRelation¶
createRelation(
sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation
createRelation...FIXME
createRelation is part of the RelationProvider abstraction (Spark SQL).
Validating Configuration Options for Batch Processing¶
validateBatchOptions(
caseInsensitiveParams: Map[String, String]): Unit
validateBatchOptions...FIXME
validateBatchOptions is used when:
KafkaSourceProvideris requested to createSource
failOnDataLoss¶
failOnDataLoss(
caseInsensitiveParams: Map[String, String]): Boolean
failOnDataLoss looks up the failOnDataLoss configuration property (in the given caseInsensitiveParams) or defaults to true.
failOnDataLoss is used when:
KafkaSourceProvideris requested for the following:- Source (and creates a KafkaSource)
- BaseRelation (and creates a KafkaRelation)
KafkaScanis requested for the following:- Batch (and creates a KafkaBatch)
- MicroBatchStream (and creates a KafkaMicroBatchStream)
- ContinuousStream (and creates a KafkaContinuousStream)
Utilities¶
kafkaParamsForDriver¶
kafkaParamsForDriver(
specifiedKafkaParams: Map[String, String]): Map[String, Object]
kafkaParamsForDriver...FIXME
kafkaParamsForDriver is used when:
KafkaBatchis requested to planInputPartitionsKafkaRelationis requested to buildScanKafkaSourceProvideris requested for a streaming sourceKafkaScanis requested for a MicroBatchStream and ContinuousStream
Kafka Producer Parameters¶
kafkaParamsForProducer(
params: CaseInsensitiveMap[String]): ju.Map[String, Object]
kafkaParamsForProducer converts the given params.
kafkaParamsForProducer creates a KafkaConfigUpdater for executor module (with the converted params) and defines the two serializer-specific options to use ByteArraySerializer:
key.serializervalue.serializer
In the end, kafkaParamsForProducer requests the KafkaConfigUpdater to build a Kafka configuration (Map[String, Object]).
kafkaParamsForProducer ensures that neither kafka.key.serializer nor kafka.value.serializer are specified or throws an IllegalArgumentException.
Kafka option 'key.serializer' is not supported as keys are serialized with ByteArraySerializer.
Kafka option 'value.serializer' is not supported as values are serialized with ByteArraySerializer.
kafkaParamsForProducer is used when:
KafkaSourceProvideris requested for a streaming sink or relationKafkaTableis requested for a WriteBuilder
convertToSpecifiedParams¶
convertToSpecifiedParams(
parameters: Map[String, String]): Map[String, String]
convertToSpecifiedParams finds kafka.-prefixed keys in the given parameters to drop the kafka. prefix and create a new parameters with a Kafka-specific configuration.
convertToSpecifiedParams is used when:
KafkaSourceProvideris requested to createSource, createRelation, kafkaParamsForProducerKafkaScanis requested to toBatch, toMicroBatchStream, toContinuousStream
Logging¶
Enable ALL logging level for org.apache.spark.sql.kafka010.KafkaSourceProvider logger to see what happens inside.
Add the following line to conf/log4j.properties:
log4j.logger.org.apache.spark.sql.kafka010.KafkaSourceProvider=ALL
Refer to Logging.