PipelinesHandler¶
PipelinesHandler is used to handle pipeline commands in Spark Connect (SparkConnectPlanner, precisely).
PipelinesHandler acts as a bridge between Python execution environment of Spark Declarative Pipelines and Spark Connect Server (where pipeline execution happens).

Handle Pipelines Command¶
handlePipelinesCommand(
  sessionHolder: SessionHolder,
  cmd: proto.PipelineCommand,
  responseObserver: StreamObserver[ExecutePlanResponse],
  transformRelationFunc: Relation => LogicalPlan): PipelineCommandResult
handlePipelinesCommand handles the given pipeline cmd command.
| PipelineCommand | Description | Initiator | 
|---|---|---|
| CREATE_DATAFLOW_GRAPH | Creates a new dataflow graph | pyspark.pipelines.spark_connect_pipeline | 
| DROP_DATAFLOW_GRAPH | Drops a pipeline | |
| DEFINE_OUTPUT | Defines an output (a table, a materialized view, a temporary view or a sink) | SparkConnectGraphElementRegistry | 
| DEFINE_FLOW | Defines a flow | SparkConnectGraphElementRegistry | 
| START_RUN | Starts a pipeline run | pyspark.pipelines.spark_connect_pipeline | 
| DEFINE_SQL_GRAPH_ELEMENTS | DEFINE_SQL_GRAPH_ELEMENTS | SparkConnectGraphElementRegistry | 
UnsupportedOperationException
handlePipelinesCommand reports an UnsupportedOperationException for incorrect commands:
[other] not supported
handlePipelinesCommand is used when:
- SparkConnectPlanner(Spark Connect) is requested to- handlePipelineCommand(for- PIPELINE_COMMANDcommand)
CREATE_DATAFLOW_GRAPH¶
handlePipelinesCommand creates a dataflow graph and sends the graph ID back.
DROP_DATAFLOW_GRAPH¶
handlePipelinesCommand...FIXME
DEFINE_OUTPUT¶
handlePipelinesCommand prints out the following INFO message to the logs:
Define pipelines output cmd received: [cmd]
handlePipelinesCommand defines an output and responds with a resolved dataset (with a catalog and a database when specified)
DEFINE_FLOW¶
handlePipelinesCommand prints out the following INFO message to the logs:
Define pipelines flow cmd received: [cmd]
handlePipelinesCommand defines a flow.
START_RUN¶
handlePipelinesCommand prints out the following INFO message to the logs:
Start pipeline cmd received: [cmd]
handlePipelinesCommand starts a pipeline run.
DEFINE_SQL_GRAPH_ELEMENTS¶
handlePipelinesCommand...FIXME
Start Pipeline Run¶
startRun(
  cmd: proto.PipelineCommand.StartRun,
  responseObserver: StreamObserver[ExecutePlanResponse],
  sessionHolder: SessionHolder): Unit
START_RUN Pipeline Command
startRun is used to handle START_RUN pipeline command.
startRun finds the GraphRegistrationContext by dataflowGraphId in the DataflowGraphRegistry (in the given SessionHolder).
startRun creates a PipelineEventSender to send pipeline execution progress events back to the Spark Connect client (Python pipeline runtime).
startRun creates a PipelineUpdateContextImpl (with the PipelineEventSender).
In the end, startRun requests the PipelineUpdateContextImpl for the PipelineExecution to run a pipeline or dry-run a pipeline for dry-run or run command, respectively.
Create Dataflow Graph¶
createDataflowGraph(
  cmd: proto.PipelineCommand.CreateDataflowGraph,
  spark: SparkSession): String
CREATE_DATAFLOW_GRAPH Pipeline Command
createDataflowGraph is used to handle CREATE_DATAFLOW_GRAPH pipeline command.
createDataflowGraph gets the catalog (from the given CreateDataflowGraph if defined in the pipeline specification file) or prints out the following INFO message to the logs and uses the current catalog instead.
No default catalog was supplied. Falling back to the current catalog: [currentCatalog].
createDataflowGraph gets the database (from the given CreateDataflowGraph if defined in the pipeline specification file) or prints out the following INFO message to the logs and uses the current database instead.
No default database was supplied. Falling back to the current database: [currentDatabase].
In the end, createDataflowGraph creates a dataflow graph (in the session's DataflowGraphRegistry).
Define SQL Datasets¶
defineSqlGraphElements(
  cmd: proto.PipelineCommand.DefineSqlGraphElements,
  session: SparkSession): Unit
DEFINE_SQL_GRAPH_ELEMENTS Pipeline Command
defineSqlGraphElements is used to handle DEFINE_SQL_GRAPH_ELEMENTS pipeline command.
defineSqlGraphElements looks up the GraphRegistrationContext for the dataflow graph ID (from the given DefineSqlGraphElements command and in the given SessionHolder).
defineSqlGraphElements creates a new SqlGraphRegistrationContext (for the GraphRegistrationContext) to process the SQL definition file.
Define Output¶
defineOutput(
  output: proto.PipelineCommand.DefineOutput,
  sessionHolder: SessionHolder): TableIdentifier
DEFINE_OUTPUT Pipeline Command
defineOutput is used to handle DEFINE_OUTPUT pipeline command.
defineOutput looks up the GraphRegistrationContext for the dataflow graph ID of the given output (or throws a SparkException if not found).
defineOutput branches off based on the output type:
| Dataset Type | Action | 
|---|---|
| MATERIALIZED_VIEWorTABLE | Registers a table | 
| TEMPORARY_VIEW | Registers a view | 
| SINK | Registers a sink | 
IllegalArgumentException
For unknown types, defineOutput reports an IllegalArgumentException:
Unknown output type: [type]
Define Flow¶
defineFlow(
  flow: proto.PipelineCommand.DefineFlow,
  transformRelationFunc: Relation => LogicalPlan,
  sparkSession: SparkSession): Unit
DEFINE_FLOW Pipeline Command
defineFlow is used to handle DEFINE_FLOW pipeline command.
defineFlow looks up the GraphRegistrationContext for the given flow (or throws a SparkException if not found).
Implicit Flows
Implicit Flows are flows with the name of the target datasets (i.e. one defined as part of dataset creation).
Implicit flows can be defined with multi-part identifiers (as the corresponding datasets).
Multi-part identifiers are composed of catalog, schema and table parts (separated by . (dot)).
defineFlow creates a flow identifier (for the flow name).
AnalysisException
defineFlow reports an AnalysisException if the given flow is not an implicit flow, but is defined with a multi-part identifier.
In the end, defineFlow registers a flow (with a proper FlowFunction).