KafkaOffsetRangeCalculator¶
[[minPartitions]][[creating-instance]] KafkaOffsetRangeCalculator
takes an optional minimum number of partitions per executor (minPartitions
) to be created (that can either be undefined or greater than 0
).
[[apply]] When created with a DataSourceOptions
, KafkaOffsetRangeCalculator
uses minPartitions option for the <
=== [[getRanges]] Offset Ranges -- getRanges
Method
[source, scala]¶
getRanges( fromOffsets: PartitionOffsetMap, untilOffsets: PartitionOffsetMap, executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange]
getRanges
finds the common TopicPartitions
that are the keys that are used in the fromOffsets
and untilOffsets
collections (intersection).
For every common TopicPartition
, getRanges
creates a <fromOffsets
and untilOffsets
collections (and the <getRanges
filters out the TopicPartitions
that <0
).
At this point, getRanges
knows the TopicPartitions
with records to consume.
getRanges
branches off based on the defined <KafkaOffsetRanges
(TopicPartitions
with records to consume).
For the <KafkaOffsetRanges
(TopicPartitions
to consume records from), getRanges
updates every KafkaOffsetRange
with the <TopicPartition
and the executorLocations
).
Otherwise (with the <KafkaOffsetRanges
), getRanges
splits KafkaOffsetRanges
into smaller ones.
=== [[KafkaOffsetRange]] KafkaOffsetRange -- TopicPartition with From and Until Offsets and Optional Preferred Location
KafkaOffsetRange
is a case class with the following attributes:
- [[topicPartition]]
TopicPartition
- [[fromOffset]]
fromOffset
offset - [[untilOffset]]
untilOffset
offset - [[preferredLoc]] Optional preferred location
[[size]] KafkaOffsetRange
knows the size, i.e. the number of records between the <
=== [[getLocation]] Selecting Preferred Executor for TopicPartition -- getLocation
Internal Method
[source, scala]¶
getLocation( tp: TopicPartition, executorLocations: Seq[String]): Option[String]
getLocation
...FIXME
NOTE: getLocation
is used exclusively when KafkaOffsetRangeCalculator
is requested to <