Demo: Custom PTransform

// The following ConsoleWrite class refuses to work in Scala REPL
// https://bugs.java.com/bugdatabase/view_bug.do?bug_id=JDK-8057919
// java.lang.InternalError: Malformed class name
//   at java.lang.Class.getSimpleName(Class.java:1330)
//   at org.apache.beam.sdk.transforms.display.DisplayData$InternalBuilder.include(DisplayData.java:763)
// Works perfectly fine in a regular Scala app

// Another catch is to use java.lang.Integer not scala.Int
// As there is no Beam Coder for scala.Ints

/**
 * Displays elements to the standard output (console)
 */
import org.apache.beam.sdk.transforms.{DoFn, PTransform}
import org.apache.beam.sdk.transforms.DoFn.{Element, ProcessElement, Setup}
import org.apache.beam.sdk.values.{PCollection, PDone}
class ConsoleWrite extends PTransform[PCollection[Integer], PDone] {
  @Override
  def expand(input: PCollection[Integer]): PDone = {
    import org.apache.beam.sdk.transforms.ParDo
    input.apply(ParDo.of(new PrintlnFn))
    PDone.in(input.getPipeline)
  }

  class PrintlnFn extends DoFn[Integer, Unit] {
    @Setup
    def setup(): Unit = {
      println(">>> PrintlnFn.setup")
    }

    @ProcessElement
    def process(@Element e: Integer): Unit = {
      println(s">>> PrintlnFn.process($e)")
    }
  }
}

import org.apache.beam.sdk.Pipeline
val pipeline = Pipeline.create()

import org.apache.beam.sdk.transforms.Create
pipeline
  .apply("Input Numbers", Create.of[Integer](0, 1, 2, 3, 4))
  .apply("Println Elements to Console", new ConsoleWrite)
pipeline.run().waitUntilFinish()