Pipeline
Pipeline
is a directed acyclic graph (DAG) of PTransforms and PCollections.
Pipeline can be run using a PipelineRunner.
Creating Pipeline
Pipeline create() (1)
Pipeline create(
PipelineOptions options)
1 | Uses PipelineOptionsFactory to create a PipelineOptions |
Pipeline.create
utility creates a new Pipeline (using the provided PipelineOptions or requests PipelineOptionsFactory to create one).
create prints out the following DEBUG message to the logs:
Creating [pipeline]
Running Pipeline
PipelineResult run() (1)
PipelineResult run(
PipelineOptions options)
1 | Uses the default options |
run creates a PipelineRunner (using PipelineRunner.fromOptions).
run prints out the following DEBUG message to the logs:
Running [this] via [runner]
run validates the options and requests the PipelineRunner to run this pipeline.
Creating Instance
Pipeline takes the following to be created:
Pipeline cannot be created directly and can only be created using Pipeline.create utility.
TransformHierarchy
Pipeline is given a TransformHierarchy when created.
Pipeline uses the TransformHierarchy for the following:
applyTransform Utility
<
InputT extends PInput,
OutputT extends POutput>
OutputT applyTransform(
InputT input,
PTransform<? super InputT, OutputT> transform) (1)
<
InputT extends PInput,
OutputT extends POutput>
OutputT applyTransform(
String name,
InputT input,
PTransform<? super InputT, OutputT> transform)
1 | Uses the name of the PTransform |
applyTransform requests the given PInput for the Pipeline that is then requested to applyInternal.
applyTransform is used when:
-
Create.TimestampedValues (PTransform) is requested to expand
-
PInputs are requested to apply a given PTransform
applyInternal Internal Method
<
InputT extends PInput,
OutputT extends POutput>
OutputT applyInternal(
String name,
InputT input,
PTransform<? super InputT, OutputT> transform)
applyInternal requests the TransformHierarchy for the current node that is then requested for the fully-qualified name of the transform.
applyInternal prints out the following DEBUG message to the logs:
Adding [transform] to [pipeline]
applyInternal requests the TransformHierarchy to push the node.
applyInternal requests the TransformHierarchy to finishSpecifyingInput.
applyInternal requests the given PTransform to expand (with the given PInput). This gives a POutput that is registered with the TransformHierarchy.
applyInternal returns the POutput (and requests the TransformHierarchy to pop the node).
applyInternal is used when Pipeline is requested to applyTransform.
applyReplacement Method
<
InputT extends PInput,
OutputT extends POutput,
TransformT extends PTransform<? super InputT, OutputT>>
void applyReplacement(
Node original,
PTransformOverrideFactory<InputT, OutputT, TransformT> replacementFactory)
applyReplacement…FIXME
traverseTopologically Method
void traverseTopologically(
PipelineVisitor visitor)
traverseTopologically requests the given PipelineVisitor to enterPipeline.
traverseTopologically then requests the TransformHierarchy to visit (all nodes with the PipelineVisitor).
In the end, traverseTopologically requests the given PipelineVisitor to leavePipeline.
traverseTopologically is used when Pipeline is requested to checkNoMoreMatches, replace and validate.
validate Internal Method
void validate(
PipelineOptions options)
validate…FIXME
validate is used when Pipeline is requested to run.
replaceAll Method
void replaceAll(
List<PTransformOverride> overrides)
replaceAll…FIXME
replaceAll is used in tests only.
replace Internal Method
void replace(
PTransformOverride override)
replace…FIXME
replace is used Pipeline is requested to replaceAll
checkNoMoreMatches Internal Method
void checkNoMoreMatches(
List<PTransformOverride> overrides)
checkNoMoreMatches…FIXME
checkNoMoreMatches is used Pipeline is requested to replaceAll.