PipelinesHandler¶
PipelinesHandler
is used to handle pipeline commands in Spark Connect (SparkConnectPlanner, precisely).
PipelinesHandler
acts as a bridge between Python execution environment of Spark Declarative Pipelines and Spark Connect Server (where pipeline execution happens).
Handle Pipelines Command¶
handlePipelinesCommand(
sessionHolder: SessionHolder,
cmd: proto.PipelineCommand,
responseObserver: StreamObserver[ExecutePlanResponse],
transformRelationFunc: Relation => LogicalPlan): PipelineCommandResult
handlePipelinesCommand
handles the given pipeline cmd
command.
PipelineCommand | Description | Initiator |
---|---|---|
CREATE_DATAFLOW_GRAPH | Creates a new dataflow graph | pyspark.pipelines.spark_connect_pipeline |
DROP_DATAFLOW_GRAPH | Drops a pipeline | |
DEFINE_DATASET | Defines a dataset | SparkConnectGraphElementRegistry |
DEFINE_FLOW | Defines a flow | SparkConnectGraphElementRegistry |
START_RUN | Starts a pipeline run | pyspark.pipelines.spark_connect_pipeline.start_run |
DEFINE_SQL_GRAPH_ELEMENTS | DEFINE_SQL_GRAPH_ELEMENTS | SparkConnectGraphElementRegistry |
handlePipelinesCommand
reports an UnsupportedOperationException
for incorrect commands:
[other] not supported
handlePipelinesCommand
is used when:
SparkConnectPlanner
(Spark Connect) is requested tohandlePipelineCommand
(forPIPELINE_COMMAND
command)
CREATE_DATAFLOW_GRAPH¶
handlePipelinesCommand
creates a dataflow graph and sends the graph ID back.
DEFINE_DATASET¶
handlePipelinesCommand
prints out the following INFO message to the logs:
Define pipelines dataset cmd received: [cmd]
handlePipelinesCommand
defines a dataset.
DEFINE_FLOW¶
handlePipelinesCommand
prints out the following INFO message to the logs:
Define pipelines flow cmd received: [cmd]
handlePipelinesCommand
defines a flow.
START_RUN¶
handlePipelinesCommand
prints out the following INFO message to the logs:
Start pipeline cmd received: [cmd]
handlePipelinesCommand
starts a pipeline run.
Start Pipeline Run¶
startRun(
cmd: proto.PipelineCommand.StartRun,
responseObserver: StreamObserver[ExecutePlanResponse],
sessionHolder: SessionHolder): Unit
START_RUN
Pipeline Command
startRun
is used when PipelinesHandler
is requested to handle proto.PipelineCommand.CommandTypeCase.START_RUN command.
startRun
finds the GraphRegistrationContext by dataflowGraphId
in the DataflowGraphRegistry (in the given SessionHolder
).
startRun
creates a PipelineEventSender to send pipeline execution progress events back to the Spark Connect client (Python pipeline runtime).
startRun
creates a PipelineUpdateContextImpl (with the PipelineEventSender
).
In the end, startRun
requests the PipelineUpdateContextImpl
for the PipelineExecution to run a pipeline or dry-run a pipeline for dry-run
or run
command, respectively.
Create Dataflow Graph¶
createDataflowGraph(
cmd: proto.PipelineCommand.CreateDataflowGraph,
spark: SparkSession): String
createDataflowGraph
finds the catalog and the database in the given cmd
command and creates a dataflow graph.
createDataflowGraph
returns the ID of the created dataflow graph.
defineSqlGraphElements¶
defineSqlGraphElements(
cmd: proto.PipelineCommand.DefineSqlGraphElements,
session: SparkSession): Unit
defineSqlGraphElements
looks up the GraphRegistrationContext for the dataflow graph ID (from the given DefineSqlGraphElements
command and in the given SessionHolder
).
defineSqlGraphElements
creates a new SqlGraphRegistrationContext (for the GraphRegistrationContext
) to process the SQL definition file.
Define Dataset (Table or View)¶
defineDataset(
dataset: proto.PipelineCommand.DefineDataset,
sparkSession: SparkSession): Unit
defineDataset
looks up the GraphRegistrationContext for the given dataset
(or throws a SparkException
if not found).
defineDataset
branches off based on the dataset
type:
Dataset Type | Action |
---|---|
MATERIALIZED_VIEW or TABLE | Registers a table |
TEMPORARY_VIEW | Registers a view |
For unknown types, defineDataset
reports an IllegalArgumentException
:
Unknown dataset type: [type]
Define Flow¶
defineFlow(
flow: proto.PipelineCommand.DefineFlow,
transformRelationFunc: Relation => LogicalPlan,
sparkSession: SparkSession): Unit
defineFlow
looks up the GraphRegistrationContext for the given flow
(or throws a SparkException
if not found).
Implicit Flows
An implicit flow is a flow with the name of the target dataset (i.e. one defined as part of dataset creation).
defineFlow
creates a flow identifier (for the flow
name).
AnalysisException
defineFlow
reports an AnalysisException
if the given flow
is not an implicit flow, but is defined with a multi-part identifier.
In the end, defineFlow
registers a flow.