Pipeline

Pipeline is a directed acyclic graph (DAG) of PTransforms and PCollections.

Pipeline can be run using a PipelineRunner.

Creating Pipeline

Pipeline create() (1)
Pipeline create(
  PipelineOptions options)
1 Uses PipelineOptionsFactory to create a PipelineOptions

Pipeline.create utility creates a new Pipeline (using the provided PipelineOptions or requests PipelineOptionsFactory to create one).

create prints out the following DEBUG message to the logs:

Creating [pipeline]

Running Pipeline

PipelineResult run() (1)
PipelineResult run(
  PipelineOptions options)
1 Uses the default options

run prints out the following DEBUG message to the logs:

Running [this] via [runner]

run validates the options and requests the PipelineRunner to run this pipeline.

Validating Pipeline Options

void validate(
  PipelineOptions options)

validate…​FIXME

Creating Instance

Pipeline takes the following to be created:

Pipeline cannot be created directly and can only be created using Pipeline.create utility.

TransformHierarchy

Pipeline is given a TransformHierarchy when created.

Pipeline uses the TransformHierarchy for the following:

applyTransform Utility

<
  InputT extends PInput,
  OutputT extends POutput>
OutputT applyTransform(
  InputT input,
  PTransform<? super InputT, OutputT> transform) (1)
<
  InputT extends PInput,
  OutputT extends POutput>
OutputT applyTransform(
  String name,
  InputT input,
  PTransform<? super InputT, OutputT> transform)
1 Uses the name of the PTransform

applyTransform requests the given PInput for the Pipeline that is then requested to applyInternal.

applyTransform is used when:

  • Create.TimestampedValues (PTransform) is requested to expand

  • PInputs are requested to apply a given PTransform

applyInternal Internal Method

<
  InputT extends PInput,
  OutputT extends POutput>
OutputT applyInternal(
  String name,
  InputT input,
  PTransform<? super InputT, OutputT> transform)

applyInternal requests the TransformHierarchy for the current node that is then requested for the fully-qualified name of the transform.

applyInternal prints out the following DEBUG message to the logs:

Adding [transform] to [pipeline]

applyInternal requests the TransformHierarchy to push the node.

applyInternal requests the TransformHierarchy to finishSpecifyingInput.

applyInternal requests the given PTransform to expand (with the given PInput). This gives a POutput that is registered with the TransformHierarchy.

applyInternal returns the POutput (and requests the TransformHierarchy to pop the node).

applyInternal is used when Pipeline is requested to applyTransform.

applyReplacement Method

<
  InputT extends PInput,
  OutputT extends POutput,
  TransformT extends PTransform<? super InputT, OutputT>>
void applyReplacement(
  Node original,
  PTransformOverrideFactory<InputT, OutputT, TransformT> replacementFactory)

applyReplacement…​FIXME

traverseTopologically Method

void traverseTopologically(
  PipelineVisitor visitor)

traverseTopologically requests the given PipelineVisitor to enterPipeline.

traverseTopologically then requests the TransformHierarchy to visit (all nodes with the PipelineVisitor).

In the end, traverseTopologically requests the given PipelineVisitor to leavePipeline.

traverseTopologically is used when Pipeline is requested to checkNoMoreMatches, replace and validate.

validate Internal Method

void validate(
  PipelineOptions options)

validate…​FIXME

validate is used when Pipeline is requested to run.

replaceAll Method

void replaceAll(
  List<PTransformOverride> overrides)

replaceAll…​FIXME

replaceAll is used in tests only.

replace Internal Method

void replace(
  PTransformOverride override)

replace…​FIXME

replace is used Pipeline is requested to replaceAll

checkNoMoreMatches Internal Method

void checkNoMoreMatches(
  List<PTransformOverride> overrides)

checkNoMoreMatches…​FIXME

checkNoMoreMatches is used Pipeline is requested to replaceAll.

Logging

Enable ALL logging level for org.apache.beam.sdk.Pipeline logger to see what happens inside.

Add the following line to log4j.properties:

log4j.logger.org.apache.beam.sdk.Pipeline=ALL