KafkaOffsetRangeCalculator¶
[[minPartitions]][[creating-instance]] KafkaOffsetRangeCalculator takes an optional minimum number of partitions per executor (minPartitions) to be created (that can either be undefined or greater than 0).
[[apply]] When created with a DataSourceOptions, KafkaOffsetRangeCalculator uses minPartitions option for the <
=== [[getRanges]] Offset Ranges -- getRanges Method
[source, scala]¶
getRanges( fromOffsets: PartitionOffsetMap, untilOffsets: PartitionOffsetMap, executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange]
getRanges finds the common TopicPartitions that are the keys that are used in the fromOffsets and untilOffsets collections (intersection).
For every common TopicPartition, getRanges creates a <fromOffsets and untilOffsets collections (and the <getRanges filters out the TopicPartitions that <0).
At this point, getRanges knows the TopicPartitions with records to consume.
getRanges branches off based on the defined <KafkaOffsetRanges (TopicPartitions with records to consume).
For the <KafkaOffsetRanges (TopicPartitions to consume records from), getRanges updates every KafkaOffsetRange with the <TopicPartition and the executorLocations).
Otherwise (with the <KafkaOffsetRanges), getRanges splits KafkaOffsetRanges into smaller ones.
=== [[KafkaOffsetRange]] KafkaOffsetRange -- TopicPartition with From and Until Offsets and Optional Preferred Location
KafkaOffsetRange is a case class with the following attributes:
- [[topicPartition]]
TopicPartition - [[fromOffset]]
fromOffsetoffset - [[untilOffset]]
untilOffsetoffset - [[preferredLoc]] Optional preferred location
[[size]] KafkaOffsetRange knows the size, i.e. the number of records between the <
=== [[getLocation]] Selecting Preferred Executor for TopicPartition -- getLocation Internal Method
[source, scala]¶
getLocation( tp: TopicPartition, executorLocations: Seq[String]): Option[String]
getLocation...FIXME
NOTE: getLocation is used exclusively when KafkaOffsetRangeCalculator is requested to <