SparkStructuredStreamingRunner

SparkStructuredStreamingRunner is a PipelineRunner that produces a SparkStructuredStreamingPipelineResult.

SparkStructuredStreamingRunner can be configured using SparkStructuredStreamingPipelineOptions.

import org.apache.beam.sdk.options.PipelineOptionsFactory
import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineOptions
val opts = PipelineOptionsFactory.as(classOf[SparkStructuredStreamingPipelineOptions])

import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingRunner
opts.setRunner(classOf[SparkStructuredStreamingRunner])

import org.apache.beam.sdk.Pipeline
val p = Pipeline.create(opts)

import org.apache.beam.sdk.io.TextIO
val linesIn = TextIO.read().from("*.txt")
val lines = p.apply(linesIn)

val countsOut = TextIO.write().to("counts.txt")
lines.apply(countsOut)

val r = p.run()
r.waitUntilFinish

import org.apache.beam.sdk.PipelineResult
assert(r.getState == PipelineResult.State.DONE)

TODO How to configure SparkStructuredStreamingRunner logger?

SparkStructuredStreamingRunner uses SparkStructuredStreamingPipelineOptions.

import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingRunner
val runner = SparkStructuredStreamingRunner.create

SparkStructuredStreamingRunner supports two modes of execution:

  • batch (default)

  • streaming

Unbounded PCollections in a Pipeline will automatically switch execution from the default batch mode to streaming. You can turn it on using streaming option (part of StreamingOptions options).

PipelineTranslatorStreaming is used for streaming mode while PipelineTranslatorBatch for batch mode.

TODO Review PipelineTranslator to understand how the translation of Beam concepts to Spark’s works.