// The following ConsoleWrite class refuses to work in Scala REPL
// https://bugs.java.com/bugdatabase/view_bug.do?bug_id=JDK-8057919
// java.lang.InternalError: Malformed class name
// at java.lang.Class.getSimpleName(Class.java:1330)
// at org.apache.beam.sdk.transforms.display.DisplayData$InternalBuilder.include(DisplayData.java:763)
// Works perfectly fine in a regular Scala app
// Another catch is to use java.lang.Integer not scala.Int
// As there is no Beam Coder for scala.Ints
/**
* Displays elements to the standard output (console)
*/
import org.apache.beam.sdk.transforms.{DoFn, PTransform}
import org.apache.beam.sdk.transforms.DoFn.{Element, ProcessElement, Setup}
import org.apache.beam.sdk.values.{PCollection, PDone}
class ConsoleWrite extends PTransform[PCollection[Integer], PDone] {
@Override
def expand(input: PCollection[Integer]): PDone = {
import org.apache.beam.sdk.transforms.ParDo
input.apply(ParDo.of(new PrintlnFn))
PDone.in(input.getPipeline)
}
class PrintlnFn extends DoFn[Integer, Unit] {
@Setup
def setup(): Unit = {
println(">>> PrintlnFn.setup")
}
@ProcessElement
def process(@Element e: Integer): Unit = {
println(s">>> PrintlnFn.process($e)")
}
}
}
import org.apache.beam.sdk.Pipeline
val pipeline = Pipeline.create()
import org.apache.beam.sdk.transforms.Create
pipeline
.apply("Input Numbers", Create.of[Integer](0, 1, 2, 3, 4))
.apply("Println Elements to Console", new ConsoleWrite)
pipeline.run().waitUntilFinish()