import org.apache.beam.sdk.Pipeline
val p = Pipeline.create()
// DEBUG Pipeline: Creating Pipeline#654533401
scala> :type p
org.apache.beam.sdk.Pipeline
// Create.Values is a root PTransform
// Add the root PTransform to the pipeline
import org.apache.beam.sdk.transforms.Create
val paths = Create.of("/tmp/txts/*.txt", "/tmp/csvs/*.csv")
val filepatterns = p.apply("Read File Patterns", paths)
scala> :type filepatterns
org.apache.beam.sdk.values.PCollection[String]
// DEBUG Pipeline: Adding Create.Values to Pipeline#654533401
// DEBUG CoderRegistry: Coder for java.lang.String: StringUtf8Coder
// DEBUG Pipeline: Adding Read(CreateSource) to Pipeline#654533401
scala> println(filepatterns.toString)
File Patterns/Read(CreateSource).out [PCollection]
// Println the input to the console
import org.apache.beam.sdk.transforms.DoFn
import org.apache.beam.sdk.transforms.DoFn._
val printlnDoFn = new DoFn[String, String]() {
@ProcessElement
def processElement(ctx: ProcessContext): Unit = {
val element = ctx.element()
println(s"[printlnDoFn] >>> $element")
ctx.output(element)
}
}
import org.apache.beam.sdk.transforms.ParDo
val doPrintln = ParDo.of(printlnDoFn)
scala> :type doPrintln
org.apache.beam.sdk.transforms.ParDo.SingleOutput[Int,Int]
val pathsPrintlnd = filepatterns.apply("Println Paths", doPrintln)
scala> :type pathsPrintlnd
org.apache.beam.sdk.values.PCollection[String]
val logElementsDoFn = new DoFn[String, String]() {
@ProcessElement
def processElement(@Element element: String): Unit = {
println(s"[logElementsDoFn] >>> $element")
}
}
import org.apache.beam.sdk.transforms.ParDo
val logElements = ParDo.of(logElementsDoFn)
val elementsLogged = pathsPrintlnd.apply("Log Elements to Console", logElements)
import org.apache.beam.sdk.io.TextIO
val counts = TextIO.write().to("counts.txt")
scala> :type counts
org.apache.beam.sdk.io.TextIO.Write
// TextIO.Write is a output PTransform
assert(counts.isInstanceOf[PTransform[_, _]])
val out = elementsLogged.apply("Write to counts.txt", counts)
scala> :type out
org.apache.beam.sdk.values.PDone
assert(out.getPipeline == p)
val result = p.run()
// DEBUG Pipeline: Running Pipeline#654533401 via org.apache.beam.runners.direct.DirectRunner@5f23cad5
// DEBUG TransformHierarchy: Visiting composite node RootNode
// ...
// DEBUG ExecutorServiceParallelExecutor: Pipeline has terminated. Shutting down.
val state = result.waitUntilFinish()
import org.apache.beam.sdk.PipelineResult.State
assert(state == State.DONE)