KafkaIO

KafkaIO is a utility to create PTransforms for reading and writing records from Kafka topics.

Reading Kafka Records — read Utility

Read<K, V> read()

read creates a Read root PTransform (that produces a PCollection of KafkaRecords of K-keys and V-values).

Reading Kafka Records — readBytes Utility

Read<byte[], byte[]> readBytes()

readBytes creates a Read root PTransform (that produces a PCollection of KafkaRecords of byte[] keys and values).

readBytes is a specialized read with ByteArrayDeserializer for the keys and values.

Writing Records to Kafka Topic — write Utility

Write<K, V> write()

write creates a Write output PTransform.

Writing Records to Kafka Topic — writeRecords Utility

WriteRecords<K, V> writeRecords()

writeRecords creates a sink PTransform (PTransform<PCollection<ProducerRecord<K, V>>, PDone>).

Demo

import org.apache.beam.sdk.io.kafka.KafkaIO
import org.apache.kafka.common.serialization.Serdes
val records = KafkaIO
  .read[String, String]()
  .withBootstrapServers(":9092")
  .withTopic("beam-input")
  .withKeyDeserializer(Serdes.String.deserializer.getClass)
  .withValueDeserializer(Serdes.String.deserializer.getClass)
import org.apache.beam.sdk.io.kafka.KafkaIO.Read
assert(records.isInstanceOf[Read[_, _]])

import org.apache.beam.sdk.Pipeline
val p = Pipeline.create()

scala> p.apply("Records from beam-input topic", records)
DEBUG Pipeline: Adding KafkaIO.Read to Pipeline#505559573
DEBUG CoderRegistry: Coder for java.lang.String: StringUtf8Coder
DEBUG CoderRegistry: Coder for java.lang.String: StringUtf8Coder
DEBUG Pipeline: Adding Read(KafkaUnboundedSource) to Pipeline#505559573