KafkaRelation¶
KafkaRelation
is a BaseRelation with a TableScan.
KafkaRelation
is <KafkaSourceProvider
is requested to create a BaseRelation (as a RelationProvider).
[[schema]] KafkaRelation
uses the fixed schema.
[[schema]] .KafkaRelation's Schema (in the positional order) [cols="1m,2",options="header",width="100%"] |=== | Field Name | Data Type
| key
| BinaryType
| value
| BinaryType
| topic
| StringType
| partition
| IntegerType
| offset
| LongType
| timestamp
| TimestampType
| timestampType
| IntegerType
|===
[[toString]] KafkaRelation
uses the following human-readable text representation:
KafkaRelation(strategy=[strategy], start=[startingOffsets], end=[endingOffsets])
[[internal-registries]] .KafkaRelation's Internal Properties (e.g. Registries, Counters and Flags) [cols="1m,2",options="header",width="100%"] |=== | Name | Description
| pollTimeoutMs a| [[pollTimeoutMs]] Timeout (in milliseconds) to poll data from Kafka (pollTimeoutMs for KafkaSourceRDD
)
Initialized with the value of the following configuration properties (in the order until one found):
. kafkaConsumer.pollTimeoutMs
in the <
. spark.network.timeout
in the SparkConf
If neither is set, defaults to 120s
.
Used exclusively when KafkaRelation
is requested to <
[[logging]] [TIP] ==== Enable INFO
or DEBUG
logging level for org.apache.spark.sql.kafka010.KafkaRelation
logger to see what happens inside.
Add the following line to conf/log4j2.properties
:
log4j.logger.org.apache.spark.sql.kafka010.KafkaRelation=DEBUG
Refer to spark-logging.md[Logging].¶
=== [[creating-instance]] Creating KafkaRelation Instance
KafkaRelation
takes the following when created:
- [[sqlContext]]
SQLContext
- [[strategy]]
ConsumerStrategy
- [[sourceOptions]] Source options (as
Map[String, String]
) that directly correspond to the options of DataFrameReader - [[specifiedKafkaParams]] User-defined Kafka parameters (as
Map[String, String]
) - [[failOnDataLoss]]
failOnDataLoss
flag - [[startingOffsets]] Starting offsets (as KafkaOffsetRangeLimit)
- [[endingOffsets]] Ending offsets (as KafkaOffsetRangeLimit)
=== [[buildScan]] Building Distributed Data Scan with Column Pruning (as TableScan) -- buildScan
Method
[source, scala]¶
buildScan(): RDD[Row]¶
buildScan
is part of TableScan abstraction.
buildScan
kafkaParamsForDriver from the <spark-kafka-relation-[randomUUID]-driver
).
buildScan
then uses the KafkaOffsetReader
to <
buildScan
...FIXME
buildScan
prints out the following INFO message to the logs:
GetBatch generating RDD of offset range: [comma-separated offsetRanges]
buildScan
then kafkaParamsForExecutors and uses it to create a KafkaSourceRDD
(with the <RDD.map
operator that creates a MapPartitionsRDD
).
TIP: Use RDD.toDebugString
to see the two RDDs, i.e. KafkaSourceRDD
and MapPartitionsRDD
, in the RDD lineage.
In the end, buildScan
requests the <KafkaSourceRDD
and the <
buildScan
throws an IllegalStateException
when the topic partitions for starting offsets are different from the ending offsets topics:
different topic partitions for starting offsets topics[[fromTopics]] and ending offsets topics[[untilTopics]]
=== [[getPartitionOffsets]] getPartitionOffsets
Internal Method
[source, scala]¶
getPartitionOffsets( kafkaReader: KafkaOffsetReader, kafkaOffsets: KafkaOffsetRangeLimit): Map[TopicPartition, Long]
getPartitionOffsets
requests the input KafkaOffsetReader
to fetchTopicPartitions.
getPartitionOffsets
uses the input KafkaOffsetRangeLimit to return the mapping of offsets per Kafka TopicPartition
fetched:
. For EarliestOffsetRangeLimit
, getPartitionOffsets
returns a map with every TopicPartition
and -2L
(as the offset)
. For LatestOffsetRangeLimit
, getPartitionOffsets
returns a map with every TopicPartition
and -1L
(as the offset)
. For SpecificOffsetRangeLimit
, getPartitionOffsets
returns a map from <
NOTE: getPartitionOffsets
is used exclusively when KafkaRelation
is requested to <
==== [[getPartitionOffsets-validateTopicPartitions]] Validating TopicPartitions (Against Partition Offsets) -- validateTopicPartitions
Inner Method
[source, scala]¶
validateTopicPartitions( partitions: Set[TopicPartition], partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, Long]
NOTE: validateTopicPartitions
is a Scala inner method of <validateTopicPartitions
is defined within the body of getPartitionOffsets
and so is visible and can only be used in getPartitionOffsets
.
validateTopicPartitions
asserts that the input set of Kafka TopicPartitions
is exactly the set of the keys in the input partitionOffsets
.
validateTopicPartitions
prints out the following DEBUG message to the logs:
Partitions assigned to consumer: [partitions]. Seeking to [partitionOffsets]
In the end, validateTopicPartitions
returns the input partitionOffsets
.
If the input set of Kafka TopicPartitions
is not the set of the keys in the input partitionOffsets
, validateTopicPartitions
throws an AssertionError
:
assertion failed: If startingOffsets contains specific offsets, you must specify all TopicPartitions.
Use -1 for latest, -2 for earliest, if you don't care.
Specified: [partitionOffsets] Assigned: [partitions]