KafkaSink¶
KafkaSink
is a streaming sink that KafkaSourceProvider registers as the kafka
format.
// start spark-shell or a Spark application with spark-sql-kafka-0-10 module
// spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0-SNAPSHOT
import org.apache.spark.sql.SparkSession
val spark: SparkSession = ...
spark.
readStream.
format("text").
load("server-logs/*.out").
as[String].
writeStream.
queryName("server-logs processor").
format("kafka"). // <-- uses KafkaSink
option("topic", "topic1").
option("checkpointLocation", "/tmp/kafka-sink-checkpoint"). // <-- mandatory
start
// in another terminal
$ echo hello > server-logs/hello.out
// in the terminal with Spark
FIXME
Creating Instance¶
KafkaSink
takes the following when created:
- [[sqlContext]]
SQLContext
- [[executorKafkaParams]] Kafka parameters (used on executor) as a map of
(String, Object)
pairs - [[topic]] Optional topic name
=== [[addBatch]] addBatch
Method
[source, scala]¶
addBatch(batchId: Long, data: DataFrame): Unit¶
Internally, addBatch
requests KafkaWriter
to write the input data
to the <
addBatch
is a part of Sink abstraction.