Skip to content

PipelinesHandler

PipelinesHandler is used to handle pipeline commands in Spark Connect (SparkConnectPlanner, precisely).

Handle Pipelines Command

handlePipelinesCommand(
  sessionHolder: SessionHolder,
  cmd: proto.PipelineCommand,
  responseObserver: StreamObserver[ExecutePlanResponse],
  transformRelationFunc: Relation => LogicalPlan): PipelineCommandResult

handlePipelinesCommand handles the given pipeline cmd command.

PipelineCommand Description
CREATE_DATAFLOW_GRAPH Creates a new Dataflow Graph
DROP_DATAFLOW_GRAPH Drops a pipeline
DEFINE_DATASET Defines a dataset
DEFINE_FLOW Defines a flow
START_RUN START_RUN
DEFINE_SQL_GRAPH_ELEMENTS DEFINE_SQL_GRAPH_ELEMENTS

handlePipelinesCommand is used when:

  • SparkConnectPlanner is requested to handlePipelineCommand (for PIPELINE_COMMAND command)

Define Dataset Command

handlePipelinesCommand prints out the following INFO message to the logs:

Define pipelines dataset cmd received: [cmd]

handlePipelinesCommand defines a dataset.

Define Flow Command

handlePipelinesCommand prints out the following INFO message to the logs:

Define pipelines flow cmd received: [cmd]

handlePipelinesCommand defines a flow.

startRun

startRun(
  cmd: proto.PipelineCommand.StartRun,
  responseObserver: StreamObserver[ExecutePlanResponse],
  sessionHolder: SessionHolder): Unit

startRun...FIXME

createDataflowGraph

createDataflowGraph(
  cmd: proto.PipelineCommand.CreateDataflowGraph,
  spark: SparkSession): String

createDataflowGraph finds the catalog and the database in the given cmd command and creates a dataflow graph.

createDataflowGraph returns the ID of the created dataflow graph.

defineSqlGraphElements

defineSqlGraphElements(
  cmd: proto.PipelineCommand.DefineSqlGraphElements,
  session: SparkSession): Unit

defineSqlGraphElements...FIXME

Define Dataset (Table or View)

defineDataset(
  dataset: proto.PipelineCommand.DefineDataset,
  sparkSession: SparkSession): Unit

defineDataset looks up the GraphRegistrationContext for the given dataset (or throws a SparkException if not found).

defineDataset branches off based on the dataset type:

Dataset Type Action
MATERIALIZED_VIEW or TABLE Registers a table
TEMPORARY_VIEW Registers a view

For unknown types, defineDataset reports an IllegalArgumentException:

Unknown dataset type: [type]

defineFlow

defineFlow(
  flow: proto.PipelineCommand.DefineFlow,
  transformRelationFunc: Relation => LogicalPlan,
  sparkSession: SparkSession): Unit

defineFlow looks up the GraphRegistrationContext for the given flow (or throws a SparkException if not found).

Implicit Flows

An implicit flow is a flow with the name of the target dataset (i.e. one defined as part of dataset creation).

defineFlow creates a flow identifier (for the flow name).

AnalysisException

defineFlow reports an AnalysisException if the given flow is not an implicit flow, but is defined with a multi-part identifier.

In the end, defineFlow registers a flow.