KafkaSourceProvider¶
KafkaSourceProvider
is the entry point (provider) to the built-in Kafka support in Spark Structured Streaming (and Spark SQL).
KafkaSourceProvider
is a DataSourceRegister
(Spark SQL) that registers itself under the kafka alias.
KafkaSourceProvider
supports micro-batch stream processing (through MicroBatchStream) and uses a specialized KafkaMicroBatchReader.
Short Name (Alias)¶
shortName(): String
shortName
is part of the DataSourceRegister abstraction.
shortName
is kafka
.
Kafka Consumer Properties on Executors¶
kafkaParamsForExecutors(
specifiedKafkaParams: Map[String, String],
uniqueGroupId: String): Map[String, Object]
kafkaParamsForExecutors
sets the Kafka properties for the Kafka Consumers
on executors.
kafkaParamsForExecutors
creates a KafkaConfigUpdater
for executor
module (with the given specifiedKafkaParams
).
kafkaParamsForExecutors
sets (overrides) the following Kafka properties explicitly (in the KafkaConfigUpdater
).
ConsumerConfig's Key | Value | Note |
---|---|---|
key.deserializer | ByteArrayDeserializer | |
value.deserializer | ByteArrayDeserializer | |
auto.offset.reset | none | |
group.id | [uniqueGroupId]-executor | setIfUnset |
enable.auto.commit | false | |
receive.buffer.bytes | 65536 | setIfUnset |
In the end, kafkaParamsForExecutors
requests the KafkaConfigUpdater
to build
a Kafka configuration.
kafkaParamsForExecutors
is used when:
KafkaSourceProvider
is requested to createSource (for a KafkaSource)KafkaScan
is requested to toMicroBatchStream (to create a KafkaMicroBatchStream), and toContinuousStream (for a KafkaContinuousStream)KafkaBatch
is requested to planInputPartitions (for KafkaBatchInputPartitions)KafkaRelation
is requested to buildScan (for a KafkaSourceRDD)
Unique Group ID for Batch Queries¶
batchUniqueGroupId(
params: CaseInsensitiveMap[String]): String
batchUniqueGroupId
takes GROUP_ID_PREFIX, if specified, or defaults to spark-kafka-relation
prefix to build the following group ID:
[groupIdPrefix]-[randomUUID]
batchUniqueGroupId
is used when:
KafkaBatch
is requested to planInputPartitionsKafkaRelation
is requested to build a Scan
Unique Group ID for Streaming Queries¶
streamingUniqueGroupId(
params: CaseInsensitiveMap[String],
metadataPath: String): String
streamingUniqueGroupId
takes GROUP_ID_PREFIX, if specified, or defaults to spark-kafka-source
prefix to build the following group ID:
[groupIdPrefix]-[randomUUID]-[metadataPath.hashCode]
streamingUniqueGroupId
is used when:
KafkaSourceProvider
is requested to create a SourceKafkaScan
is requested to toMicroBatchStream, toContinuousStream
Required Options¶
KafkaSourceProvider
requires the following options (that you can set using option
method of DataStreamReader or DataStreamWriter):
-
Exactly one of the following options: subscribe, subscribePattern or assign
Tip
Refer to Kafka Data Source's Options for the supported configuration options.
Creating KafkaTable¶
getTable(
options: CaseInsensitiveStringMap): KafkaTable
getTable
creates a KafkaTable with the value of includeheaders
option (default: false
).
getTable
is part of the SimpleTableProvider
abstraction (Spark SQL).
Creating Streaming Sink¶
createSink(
sqlContext: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode): Sink
createSink
creates a KafkaSink for topic
option (if defined) and Kafka Producer parameters.
createSink
is part of the StreamSinkProvider abstraction.
Creating Streaming Source¶
createSource(
sqlContext: SQLContext,
metadataPath: String,
schema: Option[StructType],
providerName: String,
parameters: Map[String, String]): Source
createSource
is part of the StreamSourceProvider abstraction.
createSource
validates stream options.
createSource
...FIXME
Validating Options For Batch And Streaming Queries¶
validateGeneralOptions(
parameters: Map[String, String]): Unit
Note
Parameters are case-insensitive, i.e. OptioN
and option
are equal.
validateGeneralOptions
makes sure that exactly one topic subscription strategy is used in parameters
and can be:
subscribe
subscribepattern
assign
validateGeneralOptions
reports an IllegalArgumentException
when there is no subscription strategy in use or there are more than one strategies used.
validateGeneralOptions
makes sure that the value of subscription strategies meet the requirements:
assign
strategy starts with{
(the opening curly brace)subscribe
strategy has at least one topic (in a comma-separated list of topics)subscribepattern
strategy has the pattern defined
validateGeneralOptions
makes sure that group.id
has not been specified and reports an IllegalArgumentException
otherwise.
Kafka option 'group.id' is not supported as user-specified consumer groups are not used to track offsets.
validateGeneralOptions
makes sure that auto.offset.reset
has not been specified and reports an IllegalArgumentException
otherwise.
Kafka option 'auto.offset.reset' is not supported.
Instead set the source option 'startingoffsets' to 'earliest' or 'latest' to specify where to start. Structured Streaming manages which offsets are consumed internally, rather than relying on the kafkaConsumer to do it. This will ensure that no data is missed when new topics/partitions are dynamically subscribed. Note that 'startingoffsets' only applies when a new Streaming query is started, and
that resuming will always pick up from where the query left off. See the docs for more details.
validateGeneralOptions
makes sure that the following options have not been specified and reports an IllegalArgumentException
otherwise:
kafka.key.deserializer
kafka.value.deserializer
kafka.enable.auto.commit
kafka.interceptor.classes
In the end, validateGeneralOptions
makes sure that kafka.bootstrap.servers
option was specified and reports an IllegalArgumentException
otherwise.
Option 'kafka.bootstrap.servers' must be specified for configuring Kafka consumer
validateGeneralOptions
is used when KafkaSourceProvider
validates options for streaming and batch queries.
Creating ConsumerStrategy¶
strategy(
caseInsensitiveParams: Map[String, String]): ConsumerStrategy
strategy
converts a key (in caseInsensitiveParams
) to a ConsumerStrategy.
Key | ConsumerStrategy |
---|---|
assign | AssignStrategy |
subscribe | SubscribeStrategy |
subscribepattern | SubscribePatternStrategy |
strategy
is used when...FIXME
AssignStrategy¶
AssignStrategy with Kafka TopicPartitions
strategy
uses JsonUtils.partitions
method to parse a JSON with topic names and partitions, e.g.
{"topicA":[0,1],"topicB":[0,1]}
The topic names and partitions are mapped directly to Kafka's TopicPartition
objects.
SubscribeStrategy¶
SubscribeStrategy with topic names
strategy
extracts topic names from a comma-separated string, e.g.
topic1,topic2,topic3
SubscribePatternStrategy¶
SubscribePatternStrategy with topic subscription regex pattern (that uses a Java java.util.regex.Pattern for the pattern), e.g.
topic\d
Name and Schema of Streaming Source¶
sourceSchema(
sqlContext: SQLContext,
schema: Option[StructType],
providerName: String,
parameters: Map[String, String]): (String, StructType)
sourceSchema
gives the short name (i.e. kafka
) and the fixed schema.
Internally, sourceSchema
validates Kafka options and makes sure that the optional input schema
is indeed undefined.
When the input schema
is defined, sourceSchema
reports a IllegalArgumentException
.
Kafka source has a fixed schema and cannot be set with a custom one
sourceSchema
is part of the StreamSourceProvider abstraction.
Validating Kafka Options for Streaming Queries¶
validateStreamOptions(
caseInsensitiveParams: Map[String, String]): Unit
validateStreamOptions
makes sure that endingoffsets
option is not used. Otherwise, validateStreamOptions
reports a IllegalArgumentException
.
ending offset not valid in streaming queries
validateStreamOptions
validates the general options.
validateStreamOptions
is used when KafkaSourceProvider
is requested for the schema for Kafka source and to create a KafkaSource.
Converting Configuration Options to KafkaOffsetRangeLimit¶
getKafkaOffsetRangeLimit(
params: Map[String, String],
offsetOptionKey: String,
defaultOffsets: KafkaOffsetRangeLimit): KafkaOffsetRangeLimit
getKafkaOffsetRangeLimit
finds the given offsetOptionKey
in the params
and does the following conversion:
-
latest becomes LatestOffsetRangeLimit
-
earliest becomes EarliestOffsetRangeLimit
-
A JSON-formatted text becomes SpecificOffsetRangeLimit
-
When the given
offsetOptionKey
is not found,getKafkaOffsetRangeLimit
returns the givendefaultOffsets
getKafkaOffsetRangeLimit
is used when:
KafkaSourceProvider
is requested to createSource, createMicroBatchReader, createContinuousReader, createRelation, and validateBatchOptions
Creating Fake BaseRelation¶
createRelation(
sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation
createRelation
...FIXME
createRelation
is part of the RelationProvider
abstraction (Spark SQL).
Validating Configuration Options for Batch Processing¶
validateBatchOptions(
caseInsensitiveParams: Map[String, String]): Unit
validateBatchOptions
...FIXME
validateBatchOptions
is used when:
KafkaSourceProvider
is requested to createSource
failOnDataLoss¶
failOnDataLoss(
caseInsensitiveParams: Map[String, String]): Boolean
failOnDataLoss
looks up the failOnDataLoss configuration property (in the given caseInsensitiveParams
) or defaults to true
.
failOnDataLoss
is used when:
KafkaSourceProvider
is requested for the following:- Source (and creates a KafkaSource)
- BaseRelation (and creates a KafkaRelation)
KafkaScan
is requested for the following:- Batch (and creates a KafkaBatch)
- MicroBatchStream (and creates a KafkaMicroBatchStream)
- ContinuousStream (and creates a KafkaContinuousStream)
Utilities¶
kafkaParamsForDriver¶
kafkaParamsForDriver(
specifiedKafkaParams: Map[String, String]): Map[String, Object]
kafkaParamsForDriver
...FIXME
kafkaParamsForDriver
is used when:
KafkaBatch
is requested to planInputPartitionsKafkaRelation
is requested to buildScanKafkaSourceProvider
is requested for a streaming sourceKafkaScan
is requested for a MicroBatchStream and ContinuousStream
Kafka Producer Parameters¶
kafkaParamsForProducer(
params: CaseInsensitiveMap[String]): ju.Map[String, Object]
kafkaParamsForProducer
converts the given params
.
kafkaParamsForProducer
creates a KafkaConfigUpdater
for executor
module (with the converted params) and defines the two serializer-specific options to use ByteArraySerializer
:
key.serializer
value.serializer
In the end, kafkaParamsForProducer
requests the KafkaConfigUpdater
to build
a Kafka configuration (Map[String, Object]
).
kafkaParamsForProducer
ensures that neither kafka.key.serializer
nor kafka.value.serializer
are specified or throws an IllegalArgumentException
.
Kafka option 'key.serializer' is not supported as keys are serialized with ByteArraySerializer.
Kafka option 'value.serializer' is not supported as values are serialized with ByteArraySerializer.
kafkaParamsForProducer
is used when:
KafkaSourceProvider
is requested for a streaming sink or relationKafkaTable
is requested for a WriteBuilder
convertToSpecifiedParams¶
convertToSpecifiedParams(
parameters: Map[String, String]): Map[String, String]
convertToSpecifiedParams
finds kafka.
-prefixed keys in the given parameters
to drop the kafka.
prefix and create a new parameters with a Kafka-specific configuration.
convertToSpecifiedParams
is used when:
KafkaSourceProvider
is requested to createSource, createRelation, kafkaParamsForProducerKafkaScan
is requested to toBatch, toMicroBatchStream, toContinuousStream
Logging¶
Enable ALL
logging level for org.apache.spark.sql.kafka010.KafkaSourceProvider
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.sql.kafka010.KafkaSourceProvider=ALL
Refer to Logging.