Skip to content

PipelinesHandler

PipelinesHandler is used to handle pipeline commands in Spark Connect (SparkConnectPlanner, precisely).

PipelinesHandler acts as a bridge between Python execution environment of Spark Declarative Pipelines and Spark Connect Server (where pipeline execution happens).

PipelinesHandler

Handle Pipelines Command

handlePipelinesCommand(
  sessionHolder: SessionHolder,
  cmd: proto.PipelineCommand,
  responseObserver: StreamObserver[ExecutePlanResponse],
  transformRelationFunc: Relation => LogicalPlan): PipelineCommandResult

handlePipelinesCommand handles the given pipeline cmd command.

PipelineCommand Description Initiator
CREATE_DATAFLOW_GRAPH Creates a new dataflow graph pyspark.pipelines.spark_connect_pipeline
DROP_DATAFLOW_GRAPH Drops a pipeline
DEFINE_DATASET Defines a dataset SparkConnectGraphElementRegistry
DEFINE_FLOW Defines a flow SparkConnectGraphElementRegistry
START_RUN Starts a pipeline run pyspark.pipelines.spark_connect_pipeline.start_run
DEFINE_SQL_GRAPH_ELEMENTS DEFINE_SQL_GRAPH_ELEMENTS SparkConnectGraphElementRegistry

handlePipelinesCommand reports an UnsupportedOperationException for incorrect commands:

[other] not supported

handlePipelinesCommand is used when:

  • SparkConnectPlanner (Spark Connect) is requested to handlePipelineCommand (for PIPELINE_COMMAND command)

CREATE_DATAFLOW_GRAPH

handlePipelinesCommand creates a dataflow graph and sends the graph ID back.

DROP_DATAFLOW_GRAPH

handlePipelinesCommand...FIXME

DEFINE_DATASET

handlePipelinesCommand prints out the following INFO message to the logs:

Define pipelines dataset cmd received: [cmd]

handlePipelinesCommand defines a dataset.

DEFINE_FLOW

handlePipelinesCommand prints out the following INFO message to the logs:

Define pipelines flow cmd received: [cmd]

handlePipelinesCommand defines a flow.

START_RUN

handlePipelinesCommand prints out the following INFO message to the logs:

Start pipeline cmd received: [cmd]

handlePipelinesCommand starts a pipeline run.

DEFINE_SQL_GRAPH_ELEMENTS

handlePipelinesCommand...FIXME

Start Pipeline Run

startRun(
  cmd: proto.PipelineCommand.StartRun,
  responseObserver: StreamObserver[ExecutePlanResponse],
  sessionHolder: SessionHolder): Unit
START_RUN Pipeline Command

startRun is used when PipelinesHandler is requested to handle proto.PipelineCommand.CommandTypeCase.START_RUN command.

startRun finds the GraphRegistrationContext by dataflowGraphId in the DataflowGraphRegistry (in the given SessionHolder).

startRun creates a PipelineEventSender to send pipeline execution progress events back to the Spark Connect client (Python pipeline runtime).

startRun creates a PipelineUpdateContextImpl (with the PipelineEventSender).

In the end, startRun requests the PipelineUpdateContextImpl for the PipelineExecution to run a pipeline or dry-run a pipeline for dry-run or run command, respectively.

Create Dataflow Graph

createDataflowGraph(
  cmd: proto.PipelineCommand.CreateDataflowGraph,
  spark: SparkSession): String

createDataflowGraph finds the catalog and the database in the given cmd command and creates a dataflow graph.

createDataflowGraph returns the ID of the created dataflow graph.

defineSqlGraphElements

defineSqlGraphElements(
  cmd: proto.PipelineCommand.DefineSqlGraphElements,
  session: SparkSession): Unit

defineSqlGraphElements looks up the GraphRegistrationContext for the dataflow graph ID (from the given DefineSqlGraphElements command and in the given SessionHolder).

defineSqlGraphElements creates a new SqlGraphRegistrationContext (for the GraphRegistrationContext) to process the SQL definition file.

Define Dataset (Table or View)

defineDataset(
  dataset: proto.PipelineCommand.DefineDataset,
  sparkSession: SparkSession): Unit

defineDataset looks up the GraphRegistrationContext for the given dataset (or throws a SparkException if not found).

defineDataset branches off based on the dataset type:

Dataset Type Action
MATERIALIZED_VIEW or TABLE Registers a table
TEMPORARY_VIEW Registers a view

For unknown types, defineDataset reports an IllegalArgumentException:

Unknown dataset type: [type]

Define Flow

defineFlow(
  flow: proto.PipelineCommand.DefineFlow,
  transformRelationFunc: Relation => LogicalPlan,
  sparkSession: SparkSession): Unit
DEFINE_FLOW Pipeline Command

defineFlow is used to handle DEFINE_FLOW.

defineFlow looks up the GraphRegistrationContext for the given flow (or throws a SparkException if not found).

Implicit Flows

Implicit Flows are flows with the name of the target datasets (i.e. one defined as part of dataset creation).

Implicit flows can be defined with multi-part identifiers (as the corresponding datasets).

Multi-part identifiers are composed of catalog, schema and table parts (separated by . (dot)).

defineFlow creates a flow identifier (for the flow name).

AnalysisException

defineFlow reports an AnalysisException if the given flow is not an implicit flow, but is defined with a multi-part identifier.

In the end, defineFlow registers a flow (with a proper FlowFunction).