KafkaSourceRDD¶
KafkaSourceRDD is an RDD of Kafka's ConsumerRecords (RDD[ConsumerRecord[Array[Byte], Array[Byte]]]) and no parent RDDs.
KafkaSourceRDD is <
-
KafkaRelationis requested to build a distributed data scan with column pruning -
KafkaSourceis requested to generate a streaming DataFrame with records from Kafka for a streaming micro-batch
Creating Instance¶
KafkaSourceRDD takes the following when created:
- [[sc]]
SparkContext - [[executorKafkaParams]] Collection of key-value settings for executors reading records from Kafka topics
- [[offsetRanges]] Collection of
KafkaSourceRDDOffsetRangeoffsets - [[pollTimeoutMs]] Timeout (in milliseconds) to poll data from Kafka + Used when
KafkaSourceRDD<> (for given offsets) and in turn requests the CachedKafkaConsumerto poll for records. - [[failOnDataLoss]] Flag to...FIXME
- [[reuseKafkaConsumer]] Flag to...FIXME
=== [[getPreferredLocations]] Placement Preferences of Partition (Preferred Locations) -- getPreferredLocations Method
[source, scala]¶
getPreferredLocations( split: Partition): Seq[String]
NOTE: getPreferredLocations is part of the RDD contract to specify placement preferences.
getPreferredLocations converts the given Partition to a KafkaSourceRDDPartition and...FIXME
=== [[compute]] Computing Partition -- compute Method
[source, scala]¶
compute( thePart: Partition, context: TaskContext ): Iterator[ConsumerRecord[Array[Byte], Array[Byte]]]
NOTE: compute is part of the RDD contract to compute a given partition.
compute uses KafkaDataConsumer utility to acquire a cached KafkaDataConsumer (for a partition).
compute <offsetRange of the given partition that is assumed a KafkaSourceRDDPartition).
compute returns a NextIterator so that getNext uses the KafkaDataConsumer to get a record.
When the beginning and ending offsets (of the offset range) are equal, compute prints out the following INFO message to the logs, requests the KafkaDataConsumer to release and returns an empty iterator.
Beginning offset [fromOffset] is the same as ending offset skipping [topic] [partition]
compute throws an AssertionError when the beginning offset (fromOffset) is after the ending offset (untilOffset):
[options="wrap"]¶
Beginning offset [fromOffset] is after the ending offset [untilOffset] for topic [topic] partition [partition]. You either provided an invalid fromOffset, or the Kafka topic has been damaged¶
=== [[getPartitions]] getPartitions Method
[source, scala]¶
getPartitions: Array[Partition]¶
NOTE: getPartitions is part of the RDD contract to...FIXME.
getPartitions...FIXME
=== [[persist]] Persisting RDD -- persist Method
[source, scala]¶
persist: Array[Partition]¶
NOTE: persist is part of the RDD contract to persist an RDD.
persist...FIXME
=== [[resolveRange]] resolveRange Internal Method
[source, scala]¶
resolveRange( consumer: KafkaDataConsumer, range: KafkaSourceRDDOffsetRange ): KafkaSourceRDDOffsetRange
resolveRange...FIXME
NOTE: resolveRange is used when...FIXME