Skip to content

PipelinesHandler

PipelinesHandler is used to handle pipeline commands in Spark Connect (SparkConnectPlanner, precisely).

PipelinesHandler acts as a bridge between Python execution environment of Spark Declarative Pipelines and Spark Connect Server (where pipeline execution happens).

PipelinesHandler

Handle Pipelines Command

handlePipelinesCommand(
  sessionHolder: SessionHolder,
  cmd: proto.PipelineCommand,
  responseObserver: StreamObserver[ExecutePlanResponse],
  transformRelationFunc: Relation => LogicalPlan): PipelineCommandResult

handlePipelinesCommand handles the given pipeline cmd command.

PipelineCommand Description Initiator
CREATE_DATAFLOW_GRAPH Creates a new dataflow graph pyspark.pipelines.spark_connect_pipeline
DROP_DATAFLOW_GRAPH Drops a pipeline
DEFINE_OUTPUT Defines an output (a table, a materialized view, a temporary view or a sink) SparkConnectGraphElementRegistry
DEFINE_FLOW Defines a flow SparkConnectGraphElementRegistry
START_RUN Starts a pipeline run pyspark.pipelines.spark_connect_pipeline
DEFINE_SQL_GRAPH_ELEMENTS DEFINE_SQL_GRAPH_ELEMENTS SparkConnectGraphElementRegistry
UnsupportedOperationException

handlePipelinesCommand reports an UnsupportedOperationException for incorrect commands:

[other] not supported

handlePipelinesCommand is used when:

  • SparkConnectPlanner (Spark Connect) is requested to handlePipelineCommand (for PIPELINE_COMMAND command)

CREATE_DATAFLOW_GRAPH

handlePipelinesCommand creates a dataflow graph and sends the graph ID back.

DROP_DATAFLOW_GRAPH

handlePipelinesCommand...FIXME

DEFINE_OUTPUT

handlePipelinesCommand prints out the following INFO message to the logs:

Define pipelines output cmd received: [cmd]

handlePipelinesCommand defines an output and responds with a resolved dataset (with a catalog and a database when specified)

DEFINE_FLOW

handlePipelinesCommand prints out the following INFO message to the logs:

Define pipelines flow cmd received: [cmd]

handlePipelinesCommand defines a flow.

START_RUN

handlePipelinesCommand prints out the following INFO message to the logs:

Start pipeline cmd received: [cmd]

handlePipelinesCommand starts a pipeline run.

DEFINE_SQL_GRAPH_ELEMENTS

handlePipelinesCommand...FIXME

Start Pipeline Run

startRun(
  cmd: proto.PipelineCommand.StartRun,
  responseObserver: StreamObserver[ExecutePlanResponse],
  sessionHolder: SessionHolder): Unit
START_RUN Pipeline Command

startRun is used to handle START_RUN pipeline command.

startRun finds the GraphRegistrationContext by dataflowGraphId in the DataflowGraphRegistry (in the given SessionHolder).

startRun creates a PipelineEventSender to send pipeline execution progress events back to the Spark Connect client (Python pipeline runtime).

startRun creates a PipelineUpdateContextImpl (with the PipelineEventSender).

In the end, startRun requests the PipelineUpdateContextImpl for the PipelineExecution to run a pipeline or dry-run a pipeline for dry-run or run command, respectively.

Create Dataflow Graph

createDataflowGraph(
  cmd: proto.PipelineCommand.CreateDataflowGraph,
  spark: SparkSession): String
CREATE_DATAFLOW_GRAPH Pipeline Command

createDataflowGraph is used to handle CREATE_DATAFLOW_GRAPH pipeline command.

createDataflowGraph gets the catalog (from the given CreateDataflowGraph if defined in the pipeline specification file) or prints out the following INFO message to the logs and uses the current catalog instead.

No default catalog was supplied. Falling back to the current catalog: [currentCatalog].

createDataflowGraph gets the database (from the given CreateDataflowGraph if defined in the pipeline specification file) or prints out the following INFO message to the logs and uses the current database instead.

No default database was supplied. Falling back to the current database: [currentDatabase].

In the end, createDataflowGraph creates a dataflow graph (in the session's DataflowGraphRegistry).

Define SQL Datasets

defineSqlGraphElements(
  cmd: proto.PipelineCommand.DefineSqlGraphElements,
  session: SparkSession): Unit
DEFINE_SQL_GRAPH_ELEMENTS Pipeline Command

defineSqlGraphElements is used to handle DEFINE_SQL_GRAPH_ELEMENTS pipeline command.

defineSqlGraphElements looks up the GraphRegistrationContext for the dataflow graph ID (from the given DefineSqlGraphElements command and in the given SessionHolder).

defineSqlGraphElements creates a new SqlGraphRegistrationContext (for the GraphRegistrationContext) to process the SQL definition file.

Define Output

defineOutput(
  output: proto.PipelineCommand.DefineOutput,
  sessionHolder: SessionHolder): TableIdentifier
DEFINE_OUTPUT Pipeline Command

defineOutput is used to handle DEFINE_OUTPUT pipeline command.

defineOutput looks up the GraphRegistrationContext for the dataflow graph ID of the given output (or throws a SparkException if not found).

defineOutput branches off based on the output type:

Dataset Type Action
MATERIALIZED_VIEW or TABLE Registers a table
TEMPORARY_VIEW Registers a view
SINK Registers a sink
IllegalArgumentException

For unknown types, defineOutput reports an IllegalArgumentException:

Unknown output type: [type]

Define Flow

defineFlow(
  flow: proto.PipelineCommand.DefineFlow,
  transformRelationFunc: Relation => LogicalPlan,
  sparkSession: SparkSession): Unit
DEFINE_FLOW Pipeline Command

defineFlow is used to handle DEFINE_FLOW pipeline command.

defineFlow looks up the GraphRegistrationContext for the given flow (or throws a SparkException if not found).

Implicit Flows

Implicit Flows are flows with the name of the target datasets (i.e. one defined as part of dataset creation).

Implicit flows can be defined with multi-part identifiers (as the corresponding datasets).

Multi-part identifiers are composed of catalog, schema and table parts (separated by . (dot)).

defineFlow creates a flow identifier (for the flow name).

AnalysisException

defineFlow reports an AnalysisException if the given flow is not an implicit flow, but is defined with a multi-part identifier.

In the end, defineFlow registers a flow (with a proper FlowFunction).