KafkaSource¶
KafkaSource
is a streaming source that loads data from Apache Kafka.
Note
Kafka topics are checked for new records every trigger and so there is some noticeable delay between when the records have arrived to Kafka topics and when a Spark application processes them.
KafkaSource
uses the metadata log directory to persist offsets. The directory is the source ID under the sources
directory in the checkpointRoot (of the StreamExecution).
Note
The checkpointRoot directory is one of the following:
checkpointLocation
option- spark.sql.streaming.checkpointLocation configuration property
KafkaSource
<
[[schema]] KafkaSource
uses a predefined (fixed) schema (that cannot be changed).
KafkaSource
also supports batch Datasets.
Creating Instance¶
KafkaSource
takes the following to be created:
-
SQLContext
- KafkaOffsetReader
- Parameters of executors (reading from Kafka)
- Source Options
- Path of Metadata Log (where
KafkaSource
persists KafkaSourceOffset offsets in JSON format) - Starting offsets (defined using startingOffsets option)
-
failOnDataLoss
flag to create KafkaSourceRDDs every trigger and to report an IllegalStateException on data loss.
Loading Kafka Records for Streaming Micro-Batch¶
getBatch(
start: Option[Offset],
end: Offset): DataFrame
getBatch
is part of the Source abstraction.
getBatch
creates a streaming DataFrame
with a query plan with LogicalRDD
logical operator to scan data from a KafkaSourceRDD.
Internally, getBatch
initializes <
You should see the following INFO message in the logs:
GetBatch called with start = [start], end = [end]
getBatch
requests KafkaSourceOffset
for end partition offsets for the input end
offset (known as untilPartitionOffsets
).
getBatch
requests KafkaSourceOffset
for start partition offsets for the input start
offset (if defined) or uses <fromPartitionOffsets
).
getBatch
finds the new partitions (as the difference between the topic partitions in untilPartitionOffsets
and fromPartitionOffsets
) and requests <
getBatch
<
Cannot find earliest offsets of [partitions]. Some data may have been missed
You should see the following INFO message in the logs:
Partitions added: [newPartitionOffsets]
getBatch
<0
.
Added partition [partition] starts from [offset] instead of 0. Some data may have been missed
getBatch
<fromPartitionOffsets
partitions differ from untilPartitionOffsets
partitions.
[partitions] are gone. Some data may have been missed
You should see the following DEBUG message in the logs:
TopicPartitions: [topicPartitions]
getBatch
<executorId
and host
of the registered block managers).
IMPORTANT: That is when getBatch
goes very low-level to allow for cached KafkaConsumers
in the executors to be re-used to read the same partition in every batch (aka location preference).
You should see the following DEBUG message in the logs:
Sorted executors: [sortedExecutors]
getBatch
creates a KafkaSourceRDDOffsetRange
per TopicPartition
.
getBatch
filters out KafkaSourceRDDOffsetRanges
for which until offsets are smaller than from offsets. getBatch
<
Partition [topicPartition]'s offset was changed from [fromOffset] to [untilOffset], some data may have been missed
getBatch
creates a KafkaSourceRDD (with <reuseKafkaConsumer
flag enabled) and maps it to an RDD of InternalRow
.
IMPORTANT: getBatch
creates a KafkaSourceRDD
with reuseKafkaConsumer
flag enabled.
You should see the following INFO message in the logs:
GetBatch generating RDD of offset range: [offsetRanges]
getBatch
sets <
In the end, getBatch
creates a streaming DataFrame
for the KafkaSourceRDD
and the <
=== [[getOffset]] Fetching Offsets (From Metadata Log or Kafka Directly) -- getOffset
Method
[source, scala]¶
getOffset: Option[Offset]¶
NOTE: getOffset
is a part of the ../../Source.md#getOffset[Source Contract].
Internally, getOffset
fetches the <
.KafkaSource Initializing initialPartitionOffsets While Fetching Initial Offsets image::images/KafkaSource-initialPartitionOffsets.png[align="center"]
NOTE: <getOffset
is called (which is when StreamExecution
MicroBatchExecution.md#constructNextBatch-hasNewData[constructs a streaming micro-batch]).
[source, scala]¶
scala> spark.version res0: String = 2.3.0-SNAPSHOT
// Case 1: Checkpoint directory undefined // initialPartitionOffsets read from Kafka directly val records = spark. readStream. format("kafka"). option("subscribe", "topic1"). option("kafka.bootstrap.servers", "localhost:9092"). load // Start the streaming query // dump records to the console every 10 seconds import org.apache.spark.sql.streaming.{OutputMode, Trigger} import scala.concurrent.duration._ val q = records. writeStream. format("console"). option("truncate", false). trigger(Trigger.ProcessingTime(10.seconds)). outputMode(OutputMode.Update). start // Note the temporary checkpoint directory 17/08/07 11:09:29 INFO StreamExecution: Starting [id = 75dd261d-6b62-40fc-a368-9d95d3cb6f5f, runId = f18a5eb5-ccab-4d9d-8a81-befed41a72bd] with file:///private/var/folders/0w/kb0d3rqn4zb9fcc91pxhgn8w0000gn/T/temporary-d0055630-24e4-4d9a-8f36-7a12a0f11bc0 to store the query checkpoint. ... INFO KafkaSource: Initial offsets: {"topic1":{"0":1}}
// Stop the streaming query q.stop
// Case 2: Checkpoint directory defined // initialPartitionOffsets read from Kafka directly // since the checkpoint directory is not available yet // it will be the next time the query is started val records = spark. readStream. format("kafka"). option("subscribe", "topic1"). option("kafka.bootstrap.servers", "localhost:9092"). load. select($"value" cast "string", $"topic", $"partition", $"offset") import org.apache.spark.sql.streaming.{OutputMode, Trigger} import scala.concurrent.duration._ val q = records. writeStream. format("console"). option("truncate", false). option("checkpointLocation", "/tmp/checkpoint"). // ← checkpoint directory trigger(Trigger.ProcessingTime(10.seconds)). outputMode(OutputMode.Update). start // Note the checkpoint directory in use 17/08/07 11:21:25 INFO StreamExecution: Starting [id = b8f59854-61c1-4c2f-931d-62bbaf90ee3b, runId = 70d06a3b-f2b1-4fa8-a518-15df4cf59130] with file:///tmp/checkpoint to store the query checkpoint. ... INFO KafkaSource: Initial offsets: {"topic1":{"0":1}} ... INFO StreamExecution: Stored offsets for batch 0. Metadata OffsetSeqMetadata(0,1502098526848,Map(spark.sql.shuffle.partitions -> 200, spark.sql.streaming.stateStore.providerClass -> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider))
// Review the checkpoint location // $ ls -ltr /tmp/checkpoint/offsets // total 8 // -rw-r--r-- 1 jacek wheel 248 7 sie 11:21 0 // $ tail -2 /tmp/checkpoint/offsets/0 | jq
// Produce messages to Kafka so the latest offset changes // And more importanly the offset gets stored to checkpoint location
Batch: 1¶
+---------------------------+------+---------+------+ |value |topic |partition|offset| +---------------------------+------+---------+------+ |testing checkpoint location|topic1|0 |2 | +---------------------------+------+---------+------+
// and one more // Note the offset
Batch: 2¶
+------------+------+---------+------+ |value |topic |partition|offset| +------------+------+---------+------+ |another test|topic1|0 |3 | +------------+------+---------+------+
// See what was checkpointed // $ ls -ltr /tmp/checkpoint/offsets // total 24 // -rw-r--r-- 1 jacek wheel 248 7 sie 11:35 0 // -rw-r--r-- 1 jacek wheel 248 7 sie 11:37 1 // -rw-r--r-- 1 jacek wheel 248 7 sie 11:38 2 // $ tail -2 /tmp/checkpoint/offsets/2 | jq
// Stop the streaming query q.stop
// And start over to see what offset the query starts from // Checkpoint location should have the offsets val q = records. writeStream. format("console"). option("truncate", false). option("checkpointLocation", "/tmp/checkpoint"). // ← checkpoint directory trigger(Trigger.ProcessingTime(10.seconds)). outputMode(OutputMode.Update). start // Whoops...console format does not support recovery (!) // Reported as https://issues.apache.org/jira/browse/SPARK-21667 org.apache.spark.sql.AnalysisException: This query does not support recovering from checkpoint location. Delete /tmp/checkpoint/offsets to start over.; at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:222) at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278) at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:284) ... 61 elided
// Change the sink (= output format) to JSON val q = records. writeStream. format("json"). option("path", "/tmp/json-sink"). option("checkpointLocation", "/tmp/checkpoint"). // ← checkpoint directory trigger(Trigger.ProcessingTime(10.seconds)). start // Note the checkpoint directory in use 17/08/07 12:09:02 INFO StreamExecution: Starting [id = 02e00924-5f0d-4501-bcb8-80be8a8be385, runId = 5eba2576-dad6-4f95-9031-e72514475edc] with file:///tmp/checkpoint to store the query checkpoint. ... 17/08/07 12:09:02 INFO KafkaSource: GetBatch called with start = Some({"topic1":{"0":3}}), end = {"topic1":{"0":4}} 17/08/07 12:09:02 INFO KafkaSource: Partitions added: Map() 17/08/07 12:09:02 DEBUG KafkaSource: TopicPartitions: topic1-0 17/08/07 12:09:02 DEBUG KafkaSource: Sorted executors: 17/08/07 12:09:02 INFO KafkaSource: GetBatch generating RDD of offset range: KafkaSourceRDDOffsetRange(topic1-0,3,4,None) 17/08/07 12:09:03 DEBUG KafkaOffsetReader: Partitions assigned to consumer: [topic1-0]. Seeking to the end. 17/08/07 12:09:03 DEBUG KafkaOffsetReader: Got latest offsets for partition : Map(topic1-0 -> 4) 17/08/07 12:09:03 DEBUG KafkaSource: GetOffset: ArrayBuffer((topic1-0,4)) 17/08/07 12:09:03 DEBUG StreamExecution: getOffset took 122 ms 17/08/07 12:09:03 DEBUG StreamExecution: Resuming at batch 3 with committed offsets {KafkaSource[Subscribe[topic1]]: {"topic1":{"0":4}}} and available offsets {KafkaSource[Subscribe[topic1]]: {"topic1":{"0":4}}} 17/08/07 12:09:03 DEBUG StreamExecution: Stream running from {KafkaSource[Subscribe[topic1]]: {"topic1":{"0":4}}} to {KafkaSource[Subscribe[topic1]]: {"topic1":{"0":4}}}
getOffset
requests <latest
).
NOTE: (Possible performance degradation?) It is possible that getOffset
will request the latest offsets from Kafka twice, i.e. while initializing <LatestOffsetRangeLimit
) and always as part of getOffset
itself.
getOffset
then calculates <
.getOffset's Offset Calculation per maxOffsetsPerTrigger [cols="1,1",options="header",width="100%"] |=== | maxOffsetsPerTrigger | Offsets
| Unspecified (i.e. None
) | latest
| Defined (but <limit
limit, <from
, until
as latest
| Defined (and <limit
limit, <from
, until
as latest
|===
You should see the following DEBUG message in the logs:
GetOffset: [offsets]
In the end, getOffset
creates a KafkaSourceOffset with offsets
(as Map[TopicPartition, Long]
).
=== [[fetchAndVerify]] Fetching and Verifying Specific Offsets -- fetchAndVerify
Internal Method
[source, scala]¶
fetchAndVerify(specificOffsets: Map[TopicPartition, Long]): KafkaSourceOffset¶
fetchAndVerify
requests <specificOffsets
.
fetchAndVerify
makes sure that the starting offsets in specificOffsets
are the same as in Kafka and <
startingOffsets for [tp] was [off] but consumer reset to [result(tp)]
In the end, fetchAndVerify
creates a KafkaSourceOffset (with the result of <
NOTE: fetchAndVerify
is used exclusively when KafkaSource
initializes <
=== [[initialPartitionOffsets]] Initial Partition Offsets (of 0th Batch) -- initialPartitionOffsets
Internal Lazy Property
[source, scala]¶
initialPartitionOffsets: Map[TopicPartition, Long]¶
initialPartitionOffsets
is the initial partition offsets for the batch 0
that were already persisted in the <
As the very first step, initialPartitionOffsets
creates a custom HDFSMetadataLog (of KafkaSourceOffsets metadata) in the <
initialPartitionOffsets
requests the HDFSMetadataLog
for the metadata of the 0
th batch (as KafkaSourceOffset
).
If the metadata is available, initialPartitionOffsets
requests the metadata for the collection of TopicPartitions and their offsets.
If the metadata could not be found, initialPartitionOffsets
creates a new KafkaSourceOffset
per <
-
For
EarliestOffsetRangeLimit
,initialPartitionOffsets
requests the <> to fetchEarliestOffsets -
For
LatestOffsetRangeLimit
,initialPartitionOffsets
requests the <> to fetchLatestOffsets -
For
SpecificOffsetRangeLimit
,initialPartitionOffsets
requests the <> to fetchSpecificOffsets (and report a data loss per the < > flag)
initialPartitionOffsets
requests the custom HDFSMetadataLog
to add the offsets to the metadata log (as the metadata of the 0
th batch).
initialPartitionOffsets
prints out the following INFO message to the logs:
Initial offsets: [offsets]
Note
initialPartitionOffsets
is used when KafkaSource
is requested for the following:
-
<
> -
<
> (when the start offsets are not defined, i.e. before StreamExecution
commits the first streaming batch and so nothing is in committedOffsets registry for aKafkaSource
data source yet)
==== [[initialPartitionOffsets-HDFSMetadataLog-serialize]] HDFSMetadataLog.serialize
[source, scala]¶
serialize( metadata: KafkaSourceOffset, out: OutputStream): Unit
serialize
requests the OutputStream
to write a zero byte (to support Spark 2.1.0 as per SPARK-19517).
serialize
creates a BufferedWriter
over a OutputStreamWriter
over the OutputStream
(with UTF_8
charset encoding).
serialize
requests the BufferedWriter
to write the v1 version indicator followed by a new line.
serialize
then requests the KafkaSourceOffset
for a JSON-serialized representation and the BufferedWriter
to write it out.
In the end, serialize
requests the BufferedWriter
to flush (the underlying stream).
serialize
is part of the HDFSMetadataLog abstraction.
=== [[rateLimit]] rateLimit
Internal Method
[source, scala]¶
rateLimit( limit: Long, from: Map[TopicPartition, Long], until: Map[TopicPartition, Long]): Map[TopicPartition, Long]
rateLimit
requests <
CAUTION: FIXME
NOTE: rateLimit
is used exclusively when KafkaSource
<
=== [[getSortedExecutorList]] getSortedExecutorList
Method
CAUTION: FIXME
=== [[reportDataLoss]] reportDataLoss
Internal Method
CAUTION: FIXME
[NOTE]¶
reportDataLoss
is used when KafkaSource
does the following:
- <
> -
<
>¶
=== [[internal-properties]] Internal Properties
[cols="30m,70",options="header",width="100%"] |=== | Name | Description
| currentPartitionOffsets | [[currentPartitionOffsets]] Current partition offsets (as Map[TopicPartition, Long]
)
Initially NONE
and set when KafkaSource
is requested to <
| pollTimeoutMs a| [[pollTimeoutMs]]
| sc a| [[sc]] Spark Core's SparkContext
(of the <
Used when:
-
<
> (and creating a KafkaSourceRDD) -
Initializing the pollTimeoutMs internal property
|===
Logging¶
Enable ALL
logging level for org.apache.spark.sql.kafka010.KafkaSource
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.sql.kafka010.KafkaSource=ALL
Refer to Logging.