Skip to content

PipelinesHandler

PipelinesHandler is used to handle pipeline commands in Spark Connect (SparkConnectPlanner, precisely).

PipelinesHandler acts as a bridge between Python execution environment of Spark Declarative Pipelines and Spark Connect Server (where pipeline execution happens).

PipelinesHandler

Handle Pipelines Command

handlePipelinesCommand(
  sessionHolder: SessionHolder,
  cmd: proto.PipelineCommand,
  responseObserver: StreamObserver[ExecutePlanResponse],
  transformRelationFunc: Relation => LogicalPlan): PipelineCommandResult

handlePipelinesCommand handles the given pipeline cmd command.

PipelineCommand Description Initiator
CREATE_DATAFLOW_GRAPH Creates a new dataflow graph pyspark.pipelines.spark_connect_pipeline
DROP_DATAFLOW_GRAPH Drops a pipeline
DEFINE_DATASET Defines a dataset SparkConnectGraphElementRegistry
DEFINE_FLOW Defines a flow SparkConnectGraphElementRegistry
START_RUN Starts a pipeline run pyspark.pipelines.spark_connect_pipeline.start_run
DEFINE_SQL_GRAPH_ELEMENTS DEFINE_SQL_GRAPH_ELEMENTS SparkConnectGraphElementRegistry

handlePipelinesCommand reports an UnsupportedOperationException for incorrect commands:

[other] not supported

handlePipelinesCommand is used when:

  • SparkConnectPlanner (Spark Connect) is requested to handlePipelineCommand (for PIPELINE_COMMAND command)

CREATE_DATAFLOW_GRAPH

handlePipelinesCommand creates a dataflow graph and sends the graph ID back.

DEFINE_DATASET

handlePipelinesCommand prints out the following INFO message to the logs:

Define pipelines dataset cmd received: [cmd]

handlePipelinesCommand defines a dataset.

DEFINE_FLOW

handlePipelinesCommand prints out the following INFO message to the logs:

Define pipelines flow cmd received: [cmd]

handlePipelinesCommand defines a flow.

START_RUN

handlePipelinesCommand prints out the following INFO message to the logs:

Start pipeline cmd received: [cmd]

handlePipelinesCommand starts a pipeline run.

Start Pipeline Run

startRun(
  cmd: proto.PipelineCommand.StartRun,
  responseObserver: StreamObserver[ExecutePlanResponse],
  sessionHolder: SessionHolder): Unit
START_RUN Pipeline Command

startRun is used when PipelinesHandler is requested to handle proto.PipelineCommand.CommandTypeCase.START_RUN command.

startRun finds the GraphRegistrationContext by dataflowGraphId in the DataflowGraphRegistry (in the given SessionHolder).

startRun creates a PipelineEventSender to send pipeline execution progress events back to the Spark Connect client (Python pipeline runtime).

startRun creates a PipelineUpdateContextImpl (with the PipelineEventSender).

In the end, startRun requests the PipelineUpdateContextImpl for the PipelineExecution to run a pipeline or dry-run a pipeline for dry-run or run command, respectively.

Create Dataflow Graph

createDataflowGraph(
  cmd: proto.PipelineCommand.CreateDataflowGraph,
  spark: SparkSession): String

createDataflowGraph finds the catalog and the database in the given cmd command and creates a dataflow graph.

createDataflowGraph returns the ID of the created dataflow graph.

defineSqlGraphElements

defineSqlGraphElements(
  cmd: proto.PipelineCommand.DefineSqlGraphElements,
  session: SparkSession): Unit

defineSqlGraphElements looks up the GraphRegistrationContext for the dataflow graph ID (from the given DefineSqlGraphElements command and in the given SessionHolder).

defineSqlGraphElements creates a new SqlGraphRegistrationContext (for the GraphRegistrationContext) to process the SQL definition file.

Define Dataset (Table or View)

defineDataset(
  dataset: proto.PipelineCommand.DefineDataset,
  sparkSession: SparkSession): Unit

defineDataset looks up the GraphRegistrationContext for the given dataset (or throws a SparkException if not found).

defineDataset branches off based on the dataset type:

Dataset Type Action
MATERIALIZED_VIEW or TABLE Registers a table
TEMPORARY_VIEW Registers a view

For unknown types, defineDataset reports an IllegalArgumentException:

Unknown dataset type: [type]

Define Flow

defineFlow(
  flow: proto.PipelineCommand.DefineFlow,
  transformRelationFunc: Relation => LogicalPlan,
  sparkSession: SparkSession): Unit

defineFlow looks up the GraphRegistrationContext for the given flow (or throws a SparkException if not found).

Implicit Flows

An implicit flow is a flow with the name of the target dataset (i.e. one defined as part of dataset creation).

defineFlow creates a flow identifier (for the flow name).

AnalysisException

defineFlow reports an AnalysisException if the given flow is not an implicit flow, but is defined with a multi-part identifier.

In the end, defineFlow registers a flow.