KafkaMicroBatchInputPartitionReader¶
KafkaMicroBatchInputPartitionReader
is an InputPartitionReader
(of InternalRows
) that is <KafkaMicroBatchInputPartition
is requested for one (as a part of the InputPartition
contract).
Creating Instance¶
KafkaMicroBatchInputPartitionReader
takes the following to be created:
- [[offsetRange]] KafkaOffsetRange
- [[executorKafkaParams]] Kafka parameters used for Kafka clients on executors (
Map[String, Object]
) - [[pollTimeoutMs]] Poll timeout (in ms)
- [[failOnDataLoss]]
failOnDataLoss
flag - [[reuseKafkaConsumer]]
reuseKafkaConsumer
flag
NOTE: All the input arguments to create a KafkaMicroBatchInputPartitionReader
are exactly the input arguments used to create a KafkaMicroBatchInputPartition.
KafkaMicroBatchInputPartitionReader
initializes the <
=== [[next]] next
Method
[source, scala]¶
next(): Boolean¶
NOTE: next
is part of the InputPartitionReader
contract to proceed to next record if available (true
).
next
checks whether the <
==== [[next-poll]] next
Method -- KafkaDataConsumer Polls Records
If so, next
requests the <
With a new record, next
requests the <toUnsafeRow
) the record to be the <next
sets the <next
returns true
.
With no new record, next
simply returns false
.
==== [[next-no-poll]] next
Method -- No Polling
If the <next
simply returns false
.
=== [[close]] Closing (Releasing KafkaDataConsumer) -- close
Method
[source, scala]¶
close(): Unit¶
NOTE: close
is part of the Java Closeable contract to release resources.
close
simply requests the <release
.
=== [[resolveRange]] resolveRange
Internal Method
[source, scala]¶
resolveRange( range: KafkaOffsetRange): KafkaOffsetRange
resolveRange
...FIXME
NOTE: resolveRange
is used exclusively when KafkaMicroBatchInputPartitionReader
is <
=== [[internal-properties]] Internal Properties
[cols="30m,70",options="header",width="100%"] |=== | Name | Description
| consumer a| [[consumer]] KafkaDataConsumer for the partition (per <
Used in <
| converter a| [[converter]] KafkaRecordToUnsafeRowConverter
| nextOffset a| [[nextOffset]] Next offset
| nextRow a| [[nextRow]] Next UnsafeRow
| rangeToRead a| [[rangeToRead]] KafkaOffsetRange
|===