KafkaRelation¶
KafkaRelation is a BaseRelation (Spark SQL).
KafkaRelation is a TableScan (Spark SQL).
KafkaRelation is <KafkaSourceProvider is requested to create a BaseRelation.
[[options]] .KafkaRelation's Options [cols="1m,3",options="header",width="100%"] |=== | Name | Description
| kafkaConsumer.pollTimeoutMs a| [[kafkaConsumer.pollTimeoutMs]][[pollTimeoutMs]]
Default: spark.network.timeout configuration if set or 120s
|===
[[logging]] [TIP] ==== Enable ALL logging levels for org.apache.spark.sql.kafka010.KafkaRelation to see what happens inside.
Add the following line to conf/log4j.properties:
log4j.logger.org.apache.spark.sql.kafka010.KafkaRelation=ALL
Refer to <>.¶
=== [[creating-instance]] Creating KafkaRelation Instance
KafkaRelation takes the following when created:
- [[sqlContext]]
SQLContext - [[strategy]] ConsumerStrategy
- [[sourceOptions]]
Sourceoptions (Map[String, String]) - [[specifiedKafkaParams]] User-defined Kafka parameters (
Map[String, String]) - [[failOnDataLoss]]
failOnDataLossflag - [[startingOffsets]] Starting offsets
- [[endingOffsets]] Ending offsets
=== [[getPartitionOffsets]] getPartitionOffsets Internal Method
[source, scala]¶
getPartitionOffsets( kafkaReader: KafkaOffsetReader, kafkaOffsets: KafkaOffsetRangeLimit): Map[TopicPartition, Long]
CAUTION: FIXME
NOTE: getPartitionOffsets is used exclusively when KafkaRelation <
=== [[buildScan]] Building Distributed Data Scan with Column Pruning -- buildScan Method
[source, scala]¶
buildScan(): RDD[Row]¶
NOTE: buildScan is part of the TableScan (Spark SQL) contract to build a distributed data scan with column pruning.
buildScan generates a unique group ID of the format spark-kafka-relation-[randomUUID] (to make sure that a streaming query creates a new consumer group).
buildScan creates a KafkaOffsetReader with the following:
-
The given <
> and the < > -
Kafka parameters for the driver based on the given <
> -
spark-kafka-relation-[randomUUID]-driver for the
driverGroupIdPrefix
buildScan uses the KafkaOffsetReader to <buildScan requests the KafkaOffsetReader to close afterwards.
buildScan creates offset ranges (that are a collection of KafkaSourceRDDOffsetRanges with a Kafka TopicPartition, beginning and ending offsets and undefined preferred location).
buildScan prints out the following INFO message to the logs:
Generating RDD of offset ranges: [offsetRanges]
buildScan creates a KafkaSourceRDD with the following:
-
Kafka parameters for executors based on the given <
> and the unique group ID ( spark-kafka-relation-[randomUUID]) -
The offset ranges created
-
<
> configuration -
The given <
> flag -
reuseKafkaConsumerflag off (false)
buildScan requests the KafkaSourceRDD to map Kafka ConsumerRecords to InternalRows.
In the end, buildScan requests the <DataFrame (with the name kafka and the predefined <RDD[InternalRow].
buildScan throws a IllegalStateException when...FIXME
different topic partitions for starting offsets topics[[fromTopics]] and ending offsets topics[[untilTopics]]
buildScan throws a IllegalStateException when...FIXME
[tp] doesn't have a from offset