Demo: Running Pipeline

import org.apache.beam.sdk.Pipeline
val p = Pipeline.create()

// DEBUG Pipeline: Creating Pipeline#654533401

scala> :type p
org.apache.beam.sdk.Pipeline

// Create.Values is a root PTransform
// Add the root PTransform to the pipeline
import org.apache.beam.sdk.transforms.Create
val paths = Create.of("/tmp/txts/*.txt", "/tmp/csvs/*.csv")
val filepatterns = p.apply("Read File Patterns", paths)

scala> :type filepatterns
org.apache.beam.sdk.values.PCollection[String]

// DEBUG Pipeline: Adding Create.Values to Pipeline#654533401
// DEBUG CoderRegistry: Coder for java.lang.String: StringUtf8Coder
// DEBUG Pipeline: Adding Read(CreateSource) to Pipeline#654533401

scala> println(filepatterns.toString)
File Patterns/Read(CreateSource).out [PCollection]

// Println the input to the console
import org.apache.beam.sdk.transforms.DoFn
import org.apache.beam.sdk.transforms.DoFn._
val printlnDoFn = new DoFn[String, String]() {
  @ProcessElement
  def processElement(ctx: ProcessContext): Unit = {
    val element = ctx.element()
    println(s"[printlnDoFn] >>> $element")
    ctx.output(element)
  }
}
import org.apache.beam.sdk.transforms.ParDo
val doPrintln = ParDo.of(printlnDoFn)

scala> :type doPrintln
org.apache.beam.sdk.transforms.ParDo.SingleOutput[Int,Int]

val pathsPrintlnd = filepatterns.apply("Println Paths", doPrintln)

scala> :type pathsPrintlnd
org.apache.beam.sdk.values.PCollection[String]

val logElementsDoFn = new DoFn[String, String]() {
  @ProcessElement
  def processElement(@Element element: String): Unit = {
    println(s"[logElementsDoFn] >>> $element")
  }
}
import org.apache.beam.sdk.transforms.ParDo
val logElements = ParDo.of(logElementsDoFn)

val elementsLogged = pathsPrintlnd.apply("Log Elements to Console", logElements)

import org.apache.beam.sdk.io.TextIO
val counts = TextIO.write().to("counts.txt")

scala> :type counts
org.apache.beam.sdk.io.TextIO.Write

// TextIO.Write is a output PTransform
assert(counts.isInstanceOf[PTransform[_, _]])

val out = elementsLogged.apply("Write to counts.txt", counts)

scala> :type out
org.apache.beam.sdk.values.PDone

assert(out.getPipeline == p)

val result = p.run()

// DEBUG Pipeline: Running Pipeline#654533401 via org.apache.beam.runners.direct.DirectRunner@5f23cad5
// DEBUG TransformHierarchy: Visiting composite node RootNode
// ...
// DEBUG ExecutorServiceParallelExecutor: Pipeline has terminated. Shutting down.

val state = result.waitUntilFinish()

import org.apache.beam.sdk.PipelineResult.State
assert(state == State.DONE)