Demo: Running Pipeline

import org.apache.beam.sdk.Pipeline
val p = Pipeline.create()

// DEBUG Pipeline: Creating Pipeline#654533401

scala> :type p

// Create.Values is a root PTransform
// Add the root PTransform to the pipeline
import org.apache.beam.sdk.transforms.Create
val paths = Create.of("/tmp/txts/*.txt", "/tmp/csvs/*.csv")
val filepatterns = p.apply("Read File Patterns", paths)

scala> :type filepatterns

// DEBUG Pipeline: Adding Create.Values to Pipeline#654533401
// DEBUG CoderRegistry: Coder for java.lang.String: StringUtf8Coder
// DEBUG Pipeline: Adding Read(CreateSource) to Pipeline#654533401

scala> println(filepatterns.toString)
File Patterns/Read(CreateSource).out [PCollection]

// Println the input to the console
import org.apache.beam.sdk.transforms.DoFn
import org.apache.beam.sdk.transforms.DoFn._
val printlnDoFn = new DoFn[String, String]() {
  def processElement(ctx: ProcessContext): Unit = {
    val element = ctx.element()
    println(s"[printlnDoFn] >>> $element")
import org.apache.beam.sdk.transforms.ParDo
val doPrintln = ParDo.of(printlnDoFn)

scala> :type doPrintln

val pathsPrintlnd = filepatterns.apply("Println Paths", doPrintln)

scala> :type pathsPrintlnd

val logElementsDoFn = new DoFn[String, String]() {
  def processElement(@Element element: String): Unit = {
    println(s"[logElementsDoFn] >>> $element")
import org.apache.beam.sdk.transforms.ParDo
val logElements = ParDo.of(logElementsDoFn)

val elementsLogged = pathsPrintlnd.apply("Log Elements to Console", logElements)

val counts = TextIO.write().to("counts.txt")

scala> :type counts

// TextIO.Write is a output PTransform
assert(counts.isInstanceOf[PTransform[_, _]])

val out = elementsLogged.apply("Write to counts.txt", counts)

scala> :type out

assert(out.getPipeline == p)

val result =

// DEBUG Pipeline: Running Pipeline#654533401 via
// DEBUG TransformHierarchy: Visiting composite node RootNode
// ...
// DEBUG ExecutorServiceParallelExecutor: Pipeline has terminated. Shutting down.

val state = result.waitUntilFinish()

import org.apache.beam.sdk.PipelineResult.State
assert(state == State.DONE)