Skip to content

Spark Declarative Pipelines

Spark Declarative Pipelines (SDP) is a declarative framework for building data processing (ETL) pipelines on Apache Spark in Python and SQL languages.

Apache Spark 4.1.0-SNAPSHOT

Declarative Pipelines framework is only available in the development branch of Apache Spark 4.1.0-SNAPSHOT.

Declarative Pipelines has not been released in any Spark version yet.

❯ $SPARK_HOME/bin/pyspark --version
Welcome to
     ____              __
    / __/__  ___ _____/ /__
   _\ \/ _ \/ _ `/ __/  '_/
  /___/ .__/\_,_/_/ /_/\_\   version 4.1.0-SNAPSHOT
     /_/

Using Scala version 2.13.16, OpenJDK 64-Bit Server VM, 17.0.16
Branch master
Compiled by user jacek on 2025-08-04T11:30:08Z
Revision 6ef9a9d340539fc870acca042bd036f33ea995c3
Url https://github.com/apache/spark.git
Type --help for more information.

A Declarative Pipelines project is defined and configured in a pipeline specification file.

A Declarative Pipelines project can be executed with spark-pipelines shell script.

Declarative Pipelines uses Python decorators to describe tables, views and flows, declaratively.

The definitions of tables, views and flows are registered in DataflowGraphRegistry (with GraphRegistrationContexts by graph IDs). A GraphRegistrationContext is converted into a DataflowGraph when PipelinesHandler is requested to start a pipeline run (when spark-pipelines script is launched with run or dry-run command).

Streaming flows are backed by streaming sources, and batch flows are backed by batch sources.

DataflowGraph is the core graph structure in Declarative Pipelines.

Once described, a pipeline can be started (on a PipelineExecution).

Pipeline Specification File

The heart of a Declarative Pipelines project is a pipeline specification file (in YAML format).

In the pipeline specification file, Declarative Pipelines developers specify files (libraries) with tables, views and flows (transformations) definitions in Python and SQL. A SDP project can use both languages simultaneously.

The following fields are supported:

Field Name Description
name (required)  
storage (required) The root storage location of pipeline metadata (e.g., checkpoints for streaming flows).
SPARK-53751 Explicit Checkpoint Location
catalog The default catalog to register datasets into.
Unless specified, PipelinesHandler falls back to the current catalog.
database The default database to register datasets into
Unless specified, PipelinesHandler falls back to the current database.
schema Alias of database. Used unless database is defined
configuration SparkSession configs
Spark Pipelines runtime uses the configs to build a new SparkSession when run.
spark.sql.connect.serverStacktrace.enabled is hardcoded to be always false.
libraries globs of includes with transformations in SQL and Python
Info

Pipeline spec is resolved in pyspark/pipelines/cli.py::unpack_pipeline_spec.

name: hello-spark-pipelines
catalog: default_catalog
schema: default
storage: storage-root
configuration:
  spark.key1: value1
libraries:
  - glob:
      include: transformations/**

Spark Pipelines CLI

spark-pipelines shell script is the Spark Pipelines CLI (that launches org.apache.spark.deploy.SparkPipelines behind the scenes).

Dataset Types

Declarative Pipelines supports the following dataset types:

  • Append Flows
  • Materialized views that are published to a catalog.
  • Table that are published to a catalog.
  • Streaming tables
  • Views that are not published to a catalog.

Append Flows

Append Flows can be created with the following:

Streaming Tables

Streaming tables are tables whose content is produced by one or more streaming flows.

Streaming tables can be created with the following:

Spark Connect Only

Declarative Pipelines currently only supports Spark Connect.

$ ./bin/spark-pipelines --conf spark.api.mode=xxx
...
25/08/03 12:33:57 INFO SparkPipelines: --spark.api.mode must be 'connect'. Declarative Pipelines currently only supports Spark Connect.
Exception in thread "main" org.apache.spark.SparkUserAppException: User application exited with 1
 at org.apache.spark.deploy.SparkPipelines$$anon$1.handle(SparkPipelines.scala:73)
 at org.apache.spark.launcher.SparkSubmitOptionParser.parse(SparkSubmitOptionParser.java:169)
 at org.apache.spark.deploy.SparkPipelines$$anon$1.<init>(SparkPipelines.scala:58)
 at org.apache.spark.deploy.SparkPipelines$.splitArgs(SparkPipelines.scala:57)
 at org.apache.spark.deploy.SparkPipelines$.constructSparkSubmitArgs(SparkPipelines.scala:43)
 at org.apache.spark.deploy.SparkPipelines$.main(SparkPipelines.scala:37)
 at org.apache.spark.deploy.SparkPipelines.main(SparkPipelines.scala)

Python

Python Import Alias Convention

As of this Commit 6ab0df9, the convention to alias the import of Declarative Pipelines in Python is dp (from sdp).

from pyspark import pipelines as dp

pyspark.pipelines Python Module

pyspark.pipelines module (in __init__.py) imports pyspark.pipelines.api module to expose the following Python functions (incl. decorators) to wildcard imports:

Use the following import in your Python code:

from pyspark import pipelines as dp

Python Decorators

Declarative Pipelines uses Python decorators to define tables and views.

Decorator Purpose
@dp.append_flow Append-only flows
@dp.materialized_view Materialized views (with supporting flows)
@dp.table Streaming and batch tables (with supporting flows)
@dp.temporary_view Temporary views (with supporting flows)

@dp.append_flow

append_flow(
    *,
    target: str,
    name: Optional[str] = None,
    spark_conf: Optional[Dict[str, str]] = None,
) -> Callable[[QueryFunction], None] # (1)!
  1. QueryFunction = Callable[[], DataFrame] is a Python function that takes no arguments and returns a PySpark DataFrame.

Registers an append Flow (in the active GraphElementRegistry)

target is the name of the dataset (destination) this flow writes to.

dp.create_sink

create_sink(
    name: str,
    format: str,
    options: Optional[Dict[str, str]] = None,
) -> None

Registers a Sink output in the active GraphElementRegistry.

Not Python Decorator

Unlike the others, create_sink is not a Python decorator (Callable).

dp.create_streaming_table

create_streaming_table(
    name: str,
    *,
    comment: Optional[str] = None,
    table_properties: Optional[Dict[str, str]] = None,
    partition_cols: Optional[List[str]] = None,
    cluster_by: Optional[List[str]] = None,
    schema: Optional[Union[StructType, str]] = None,
    format: Optional[str] = None,
) -> None

Not Python Decorator

Unlike the others, create_streaming_table is not a Python decorator (Callable).

Registers a StreamingTable dataset (in the active GraphElementRegistry) for Append Flows.

@dp.materialized_view

materialized_view(
    query_function: Optional[QueryFunction] = None,
    *,
    name: Optional[str] = None,
    comment: Optional[str] = None,
    spark_conf: Optional[Dict[str, str]] = None,
    table_properties: Optional[Dict[str, str]] = None,
    partition_cols: Optional[List[str]] = None,
    cluster_by: Optional[List[str]] = None,
    schema: Optional[Union[StructType, str]] = None,
    format: Optional[str] = None,
) -> Union[Callable[[QueryFunction], None], None]

Registers a MaterializedView dataset with an accompanying Flow in the active GraphElementRegistry.

@dp.table

table(
    query_function: Optional[QueryFunction] = None,
    *,
    name: Optional[str] = None,
    comment: Optional[str] = None,
    spark_conf: Optional[Dict[str, str]] = None,
    table_properties: Optional[Dict[str, str]] = None,
    partition_cols: Optional[List[str]] = None,
    cluster_by: Optional[List[str]] = None,
    schema: Optional[Union[StructType, str]] = None,
    format: Optional[str] = None,
) -> Union[Callable[[QueryFunction], None], None]

Registers a StreamingTable dataset with an accompanying Flow in the active GraphElementRegistry.

@dp.temporary_view

temporary_view(
    query_function: Optional[QueryFunction] = None,
    *,
    name: Optional[str] = None,
    comment: Optional[str] = None,
    spark_conf: Optional[Dict[str, str]] = None,
) -> Union[Callable[[QueryFunction], None], None]

Registers a TemporaryView dataset with an accompanying Flow in the active GraphElementRegistry.

SQL

Spark Declarative Pipelines supports SQL language to define data processing pipelines.

Pipelines elements are defined in SQL files included as libraries in a pipelines specification file.

SqlGraphRegistrationContext is used on Spark Connect Server to handle SQL statements (from SQL definitions files and Python decorators).

Supported SQL statements:

A streaming table can be defined without a query, as streaming tables' data can be backed by standalone flows. During a pipeline execution, it is validated that a streaming table has at least one standalone flow writing to the table, if no query is specified in the create statement itself.

Demo: Create Virtual Environment for Python Client

uv init hello-spark-pipelines && cd hello-spark-pipelines
export SPARK_HOME=/Users/jacek/oss/spark
uv add --editable $SPARK_HOME/python/packaging/client
uv tree --depth 2
hello-spark-pipelines v0.1.0
└── pyspark-client v4.2.0.dev0
    ├── googleapis-common-protos v1.72.0
    ├── grpcio v1.76.0
    ├── grpcio-status v1.76.0
    ├── numpy v2.3.4
    ├── pandas v2.3.3
    ├── pyarrow v22.0.0
    ├── pyyaml v6.0.3
    └── zstandard v0.25.0
uv pip list
Package                  Version     Editable project location
------------------------ ----------- ----------------------------------------------
googleapis-common-protos 1.72.0
grpcio                   1.76.0
grpcio-status            1.76.0
numpy                    2.3.4
pandas                   2.3.3
protobuf                 6.33.1
pyarrow                  22.0.0
pyspark-client           4.2.0.dev0  /Users/jacek/oss/spark/python/packaging/client
python-dateutil          2.9.0.post0
pytz                     2025.2
pyyaml                   6.0.3
six                      1.17.0
typing-extensions        4.15.0
tzdata                   2025.2
zstandard                0.25.0

Activate (source) the virtual environment (that uv helped us create).

source .venv/bin/activate

This activation brings all the necessary Spark Declarative Pipelines' Python dependencies (that are only available in the source format only) for non-uv tools and CLI, incl. Spark Pipelines CLI itself.

$SPARK_HOME/bin/spark-pipelines --help
usage: cli.py [-h] {run,dry-run,init} ...

Pipelines CLI

positional arguments:
  {run,dry-run,init}
    run               Run a pipeline. If no refresh options specified, a
                      default incremental update is performed.
    dry-run           Launch a run that just validates the graph and checks
                      for errors.
    init              Generate a sample pipeline project, with a spec file and
                      example transformations.

options:
  -h, --help          show this help message and exit

macOS and PYSPARK_PYTHON

On macOS, you may want to define PYSPARK_PYTHON environment variable to point at Python >= 3.10.

export PYSPARK_PYTHON=python3.14

Demo: Python API

Activate Virtual Environment

Follow Demo: Create Virtual Environment for Python Client before getting started with this demo.

In a terminal, start a Spark Connect Server.

./sbin/start-connect-server.sh

It will listen on port 15002.

Monitor Logs
tail -f logs/*org.apache.spark.sql.connect.service.SparkConnectServer*.out

Start a Spark Connect-enabled PySpark shell.

$SPARK_HOME/bin/pyspark --remote sc://localhost:15002
from pyspark.pipelines.spark_connect_pipeline import create_dataflow_graph
dataflow_graph_id = create_dataflow_graph(
  spark,
  default_catalog=None,
  default_database=None,
  sql_conf=None,
)

# >>> print(dataflow_graph_id)
# 3cb66d5a-0621-4f15-9920-e99020e30e48
from pyspark.pipelines.spark_connect_graph_element_registry import SparkConnectGraphElementRegistry
registry = SparkConnectGraphElementRegistry(spark, dataflow_graph_id)
from pyspark import pipelines as dp
from pyspark.pipelines.graph_element_registry import graph_element_registration_context
with graph_element_registration_context(registry):
  dp.create_streaming_table("demo_streaming_table")

You should see the following INFO message in the logs of the Spark Connect Server:

INFO PipelinesHandler: Define pipelines dataset cmd received: define_dataset {
  dataflow_graph_id: "3cb66d5a-0621-4f15-9920-e99020e30e48"
  dataset_name: "demo_streaming_table"
  dataset_type: TABLE
}

Demo: spark-pipelines CLI

Activate Virtual Environment

Follow Demo: Create Virtual Environment for Python Client before getting started with this demo.

1️⃣ Display Pipelines Help

Run spark-pipelines --help to learn the options.

$SPARK_HOME/bin/spark-pipelines --help
usage: cli.py [-h] {run,dry-run,init} ...

Pipelines CLI

positional arguments:
  {run,dry-run,init}
    run               Run a pipeline. If no refresh options specified, a
                      default incremental update is performed.
    dry-run           Launch a run that just validates the graph and checks
                      for errors.
    init              Generate a sample pipeline project, including a spec
                      file and example transformations.

options:
  -h, --help          show this help message and exit

2️⃣ Create Pipelines Demo Project

You've only created an empty Python project so far (using uv).

Create a demo double hello-spark-pipelines pipelines project with a sample pipeline.yml and sample transformations (in Python and in SQL).

$SPARK_HOME/bin/spark-pipelines init --name hello-spark-pipelines && \
mv hello-spark-pipelines/* . && \
rm -rf hello-spark-pipelines
cat pipeline.yml
name: hello-spark-pipelines
storage: file:///Users/jacek/sandbox/hello-spark-pipelines/hello-spark-pipelines/pipeline-storage
libraries:
  - glob:
      include: transformations/**
tree transformations
transformations
├── example_python_materialized_view.py
└── example_sql_materialized_view.sql

1 directory, 2 files

Spark Connect Server should be down

spark-pipelines dry-run starts its own Spark Connect Server at 15002 port (unless started with --remote option).

Shut down Spark Connect Server if you started it already.

$SPARK_HOME/sbin/stop-connect-server.sh

--remote option

Use --remote option to connect to a standalone Spark Connect Server.

$SPARK_HOME/bin/spark-pipelines --remote sc://localhost dry-run

3️⃣ Dry Run Pipelines Project

$SPARK_HOME/bin/spark-pipelines dry-run
Loading pipeline spec from /Users/jacek/sandbox/hello-spark-pipelines/pipeline.yml...
Creating Spark session...
Creating dataflow graph...
Registering graph elements...
Loading definitions. Root directory: '/Users/jacek/sandbox/hello-spark-pipelines'.
Found 2 files matching glob 'transformations/**/*'
Importing /Users/jacek/sandbox/hello-spark-pipelines/transformations/example_python_materialized_view.py...
Registering SQL file /Users/jacek/sandbox/hello-spark-pipelines/transformations/example_sql_materialized_view.sql...
Starting run...
Run is COMPLETED.

4️⃣ Run Pipelines Project

Run the pipeline.

$SPARK_HOME/bin/spark-pipelines run
Loading pipeline spec from /Users/jacek/sandbox/hello-spark-pipelines/pipeline.yml...
Creating Spark session...
Creating dataflow graph...
Registering graph elements...
Loading definitions. Root directory: '/Users/jacek/sandbox/hello-spark-pipelines'.
Found 2 files matching glob 'transformations/**/*'
Importing /Users/jacek/sandbox/hello-spark-pipelines/transformations/example_python_materialized_view.py...
Registering SQL file /Users/jacek/sandbox/hello-spark-pipelines/transformations/example_sql_materialized_view.sql...
Starting run...
Flow spark_catalog.default.example_python_materialized_view is QUEUED.
Flow spark_catalog.default.example_sql_materialized_view is QUEUED.
Flow spark_catalog.default.example_python_materialized_view is PLANNING.
Flow spark_catalog.default.example_python_materialized_view is STARTING.
Flow spark_catalog.default.example_python_materialized_view is RUNNING.
Flow spark_catalog.default.example_python_materialized_view has COMPLETED.
Flow spark_catalog.default.example_sql_materialized_view is PLANNING.
Flow spark_catalog.default.example_sql_materialized_view is STARTING.
Flow spark_catalog.default.example_sql_materialized_view is RUNNING.
Flow spark_catalog.default.example_sql_materialized_view has COMPLETED.
Run is COMPLETED.
tree spark-warehouse
spark-warehouse
├── example_python_materialized_view
│   ├── _SUCCESS
│   └── part-00000-284bc03a-3405-4e8e-bbd7-f6f17d79c282-c000.snappy.parquet
└── example_sql_materialized_view
    ├── _SUCCESS
    └── part-00000-8316b6c6-7532-4f7a-92f6-2ec024e069f4-c000.snappy.parquet

3 directories, 4 files

Demo: Scala API

Step 1. Register Dataflow Graph

DataflowGraphRegistry

import org.apache.spark.sql.connect.pipelines.DataflowGraphRegistry

val graphId = DataflowGraphRegistry.createDataflowGraph(
  defaultCatalog=spark.catalog.currentCatalog(),
  defaultDatabase=spark.catalog.currentDatabase,
  defaultSqlConf=Map.empty)

Step 2. Look Up Dataflow Graph

DataflowGraphRegistry

import org.apache.spark.sql.pipelines.graph.GraphRegistrationContext

val graphCtx: GraphRegistrationContext =
  DataflowGraphRegistry.getDataflowGraphOrThrow(dataflowGraphId=graphId)

Step 3. Create DataflowGraph

GraphRegistrationContext

import org.apache.spark.sql.pipelines.graph.DataflowGraph

val dp: DataflowGraph = graphCtx.toDataflowGraph

Step 4. Create Update Context

PipelineUpdateContextImpl

import org.apache.spark.sql.pipelines.graph.{ PipelineUpdateContext, PipelineUpdateContextImpl }
import org.apache.spark.sql.pipelines.logging.PipelineEvent

val swallowEventsCallback: PipelineEvent => Unit = _ => ()

val updateCtx: PipelineUpdateContext =
  new PipelineUpdateContextImpl(unresolvedGraph=dp, eventCallback=swallowEventsCallback)

Step 5. Start Pipeline

PipelineExecution

updateCtx.pipelineExecution.runPipeline()

Learning Resources

  1. Spark Declarative Pipelines Programming Guide