PipelinesHandler¶
PipelinesHandler
is used to handle pipeline commands in Spark Connect (SparkConnectPlanner, precisely).
PipelinesHandler
acts as a bridge between Python execution environment of Spark Declarative Pipelines and Spark Connect Server (where pipeline execution happens).
Handle Pipelines Command¶
handlePipelinesCommand(
sessionHolder: SessionHolder,
cmd: proto.PipelineCommand,
responseObserver: StreamObserver[ExecutePlanResponse],
transformRelationFunc: Relation => LogicalPlan): PipelineCommandResult
handlePipelinesCommand
handles the given pipeline cmd
command.
PipelineCommand | Description | Initiator |
---|---|---|
CREATE_DATAFLOW_GRAPH | Creates a new dataflow graph | pyspark.pipelines.spark_connect_pipeline |
DROP_DATAFLOW_GRAPH | Drops a pipeline | |
DEFINE_DATASET | Defines a dataset | SparkConnectGraphElementRegistry |
DEFINE_FLOW | Defines a flow | SparkConnectGraphElementRegistry |
START_RUN | Starts a pipeline run | pyspark.pipelines.spark_connect_pipeline.start_run |
DEFINE_SQL_GRAPH_ELEMENTS | DEFINE_SQL_GRAPH_ELEMENTS | SparkConnectGraphElementRegistry |
handlePipelinesCommand
reports an UnsupportedOperationException
for incorrect commands:
[other] not supported
handlePipelinesCommand
is used when:
SparkConnectPlanner
(Spark Connect) is requested tohandlePipelineCommand
(forPIPELINE_COMMAND
command)
CREATE_DATAFLOW_GRAPH¶
handlePipelinesCommand creates a dataflow graph and sends the graph ID back.
DROP_DATAFLOW_GRAPH¶
handlePipelinesCommand...FIXME
DEFINE_DATASET¶
handlePipelinesCommand prints out the following INFO message to the logs:
Define pipelines dataset cmd received: [cmd]
handlePipelinesCommand
defines a dataset.
DEFINE_FLOW¶
handlePipelinesCommand prints out the following INFO message to the logs:
Define pipelines flow cmd received: [cmd]
handlePipelinesCommand
defines a flow.
START_RUN¶
handlePipelinesCommand prints out the following INFO message to the logs:
Start pipeline cmd received: [cmd]
handlePipelinesCommand
starts a pipeline run.
DEFINE_SQL_GRAPH_ELEMENTS¶
handlePipelinesCommand...FIXME
Start Pipeline Run¶
startRun(
cmd: proto.PipelineCommand.StartRun,
responseObserver: StreamObserver[ExecutePlanResponse],
sessionHolder: SessionHolder): Unit
START_RUN
Pipeline Command
startRun
is used when PipelinesHandler
is requested to handle proto.PipelineCommand.CommandTypeCase.START_RUN command.
startRun
finds the GraphRegistrationContext by dataflowGraphId
in the DataflowGraphRegistry (in the given SessionHolder
).
startRun
creates a PipelineEventSender to send pipeline execution progress events back to the Spark Connect client (Python pipeline runtime).
startRun
creates a PipelineUpdateContextImpl (with the PipelineEventSender
).
In the end, startRun
requests the PipelineUpdateContextImpl
for the PipelineExecution to run a pipeline or dry-run a pipeline for dry-run
or run
command, respectively.
Create Dataflow Graph¶
createDataflowGraph(
cmd: proto.PipelineCommand.CreateDataflowGraph,
spark: SparkSession): String
createDataflowGraph
finds the catalog and the database in the given cmd
command and creates a dataflow graph.
createDataflowGraph
returns the ID of the created dataflow graph.
defineSqlGraphElements¶
defineSqlGraphElements(
cmd: proto.PipelineCommand.DefineSqlGraphElements,
session: SparkSession): Unit
defineSqlGraphElements
looks up the GraphRegistrationContext for the dataflow graph ID (from the given DefineSqlGraphElements
command and in the given SessionHolder
).
defineSqlGraphElements
creates a new SqlGraphRegistrationContext (for the GraphRegistrationContext
) to process the SQL definition file.
Define Dataset (Table or View)¶
defineDataset(
dataset: proto.PipelineCommand.DefineDataset,
sparkSession: SparkSession): Unit
defineDataset
looks up the GraphRegistrationContext for the given dataset
(or throws a SparkException
if not found).
defineDataset
branches off based on the dataset
type:
Dataset Type | Action |
---|---|
MATERIALIZED_VIEW or TABLE | Registers a table |
TEMPORARY_VIEW | Registers a view |
For unknown types, defineDataset
reports an IllegalArgumentException
:
Unknown dataset type: [type]
Define Flow¶
defineFlow(
flow: proto.PipelineCommand.DefineFlow,
transformRelationFunc: Relation => LogicalPlan,
sparkSession: SparkSession): Unit
DEFINE_FLOW Pipeline Command
defineFlow
is used to handle DEFINE_FLOW.
defineFlow
looks up the GraphRegistrationContext for the given flow
(or throws a SparkException
if not found).
Implicit Flows
Implicit Flows are flows with the name of the target datasets (i.e. one defined as part of dataset creation).
Implicit flows can be defined with multi-part identifiers (as the corresponding datasets).
Multi-part identifiers are composed of catalog, schema and table parts (separated by .
(dot)).
defineFlow
creates a flow identifier (for the flow
name).
AnalysisException
defineFlow
reports an AnalysisException
if the given flow
is not an implicit flow, but is defined with a multi-part identifier.
In the end, defineFlow
registers a flow (with a proper FlowFunction).