Spark Declarative Pipelines¶
Spark Declarative Pipelines (SDP) is a declarative framework for building data processing (ETL) pipelines on Apache Spark in Python and SQL languages.
A Declarative Pipelines project is defined and configured in a pipeline specification file.
A Declarative Pipelines project can be executed with spark-pipelines shell script.
Declarative Pipelines uses Python decorators to describe tables, views and flows, declaratively.
The definitions of tables, views and flows are registered in DataflowGraphRegistry (with GraphRegistrationContexts by graph IDs). A GraphRegistrationContext is converted into a DataflowGraph when PipelinesHandler is requested to start a pipeline run (when spark-pipelines script is launched with run or dry-run command).
Streaming flows are backed by streaming sources, and batch flows are backed by batch sources.
DataflowGraph is the core graph structure in Declarative Pipelines.
Once described, a pipeline can be started (on a PipelineExecution).
Configuration Properties¶
spark.sql.pipelines Configuration Properties
Pipeline Specification File¶
A Declarative Pipelines project is defined using a pipeline specification file (in YAML format).
Unless specified using spark-pipelines CLI's --spec option, Declarative Pipelines uses the following file names as the defaults:
spark-pipeline.ymlspark-pipeline.yaml
In the pipeline specification file, Declarative Pipelines developers specify files (libraries) with tables, views and flows (transformations) definitions in Python and SQL. A SDP project can use both languages simultaneously.
The following fields are supported:
| Field Name | Description |
|---|---|
name (required) | |
storage (required) | The root storage location of pipeline metadata (e.g., checkpoints for streaming flows). SPARK-53751 Explicit Checkpoint Location |
catalog | The default catalog to register datasets into. Unless specified, PipelinesHandler falls back to the current catalog. |
database | The default database to register datasets into Unless specified, PipelinesHandler falls back to the current database. |
schema | Alias of database. Used unless database is defined |
configuration | SparkSession configs Spark Pipelines runtime uses the configs to build a new SparkSession when run.spark.sql.connect.serverStacktrace.enabled is hardcoded to be always false. |
libraries | globs of includes with transformations in SQL and Python |
Info
Pipeline spec is resolved in pyspark/pipelines/cli.py::unpack_pipeline_spec.
name: hello-spark-pipelines
catalog: default_catalog
schema: default
storage: file:///absolute/path/to/storage/dir
configuration:
spark.key1: value1
libraries:
- glob:
include: transformations/**
Spark Pipelines CLI¶
spark-pipelines shell script is the Spark Pipelines CLI (that launches org.apache.spark.deploy.SparkPipelines behind the scenes).
Dataset Types¶
Declarative Pipelines supports the following dataset types:
- Append Flows
- Materialized Views
- Streaming tables
- Table that are published to a catalog.
- Views that are not published to a catalog.
Append Flows¶
Append Flows can be created with the following:
Materialized Views¶
Materialized Views can be created with the following:
Materialized Views are published to a catalog.
Streaming Tables¶
Streaming tables are tables whose content is produced by one or more streaming flows.
Streaming tables can be created with the following:
- @dp.create_streaming_table or CREATE STREAMING TABLE (with no flows that can be defined later with @dp.append_flow or CREATE FLOW AS INSERT INTO BY NAME)
- CREATE STREAMING TABLE AS
Spark Connect Only¶
Declarative Pipelines currently only supports Spark Connect.
$ ./bin/spark-pipelines --conf spark.api.mode=xxx
...
25/08/03 12:33:57 INFO SparkPipelines: --spark.api.mode must be 'connect'. Declarative Pipelines currently only supports Spark Connect.
Exception in thread "main" org.apache.spark.SparkUserAppException: User application exited with 1
at org.apache.spark.deploy.SparkPipelines$$anon$1.handle(SparkPipelines.scala:73)
at org.apache.spark.launcher.SparkSubmitOptionParser.parse(SparkSubmitOptionParser.java:169)
at org.apache.spark.deploy.SparkPipelines$$anon$1.<init>(SparkPipelines.scala:58)
at org.apache.spark.deploy.SparkPipelines$.splitArgs(SparkPipelines.scala:57)
at org.apache.spark.deploy.SparkPipelines$.constructSparkSubmitArgs(SparkPipelines.scala:43)
at org.apache.spark.deploy.SparkPipelines$.main(SparkPipelines.scala:37)
at org.apache.spark.deploy.SparkPipelines.main(SparkPipelines.scala)
Python¶
SQL¶
Demo: Create Virtual Environment for Python Client¶
uv init hello-spark-pipelines && cd hello-spark-pipelines
export SPARK_HOME=/Users/jacek/oss/spark
uv add --editable $SPARK_HOME/python/packaging/client
uv tree --depth 2
hello-spark-pipelines v0.1.0
└── pyspark-client v4.2.0.dev0
├── googleapis-common-protos v1.72.0
├── grpcio v1.76.0
├── grpcio-status v1.76.0
├── numpy v2.3.4
├── pandas v2.3.3
├── pyarrow v22.0.0
├── pyyaml v6.0.3
└── zstandard v0.25.0
uv pip list
Package Version Editable project location
------------------------ ----------- ----------------------------------------------
googleapis-common-protos 1.72.0
grpcio 1.76.0
grpcio-status 1.76.0
numpy 2.3.4
pandas 2.3.3
protobuf 6.33.1
pyarrow 22.0.0
pyspark-client 4.2.0.dev0 /Users/jacek/oss/spark/python/packaging/client
python-dateutil 2.9.0.post0
pytz 2025.2
pyyaml 6.0.3
six 1.17.0
typing-extensions 4.15.0
tzdata 2025.2
zstandard 0.25.0
Activate (source) the virtual environment (that uv helped us create).
source .venv/bin/activate
This activation brings all the necessary Spark Declarative Pipelines Python dependencies (that are only available in the source format only) for non-uv tools and CLI, incl. Spark Pipelines CLI itself.
$SPARK_HOME/bin/spark-pipelines --help
usage: cli.py [-h] {run,dry-run,init} ...
Pipelines CLI
positional arguments:
{run,dry-run,init}
run Run a pipeline. If no refresh options specified, a
default incremental update is performed.
dry-run Launch a run that just validates the graph and checks
for errors.
init Generate a sample pipeline project, with a spec file and
example transformations.
options:
-h, --help show this help message and exit
macOS and PYSPARK_PYTHON
On macOS, you may want to define PYSPARK_PYTHON environment variable to point at Python >= 3.10.
export PYSPARK_PYTHON=python3.14
Demo: Python API¶
Activate Virtual Environment
Follow Demo: Create Virtual Environment for Python Client before getting started with this demo.
In a terminal, start a Spark Connect Server.
./sbin/start-connect-server.sh
It will listen on port 15002.
Monitor Logs
tail -f logs/*org.apache.spark.sql.connect.service.SparkConnectServer*.out
Start a Spark Connect-enabled PySpark shell.
$SPARK_HOME/bin/pyspark --remote sc://localhost:15002
from pyspark.pipelines.spark_connect_pipeline import create_dataflow_graph
dataflow_graph_id = create_dataflow_graph(
spark,
default_catalog=None,
default_database=None,
sql_conf=None,
)
# >>> print(dataflow_graph_id)
# 3cb66d5a-0621-4f15-9920-e99020e30e48
from pyspark.pipelines.spark_connect_graph_element_registry import SparkConnectGraphElementRegistry
registry = SparkConnectGraphElementRegistry(spark, dataflow_graph_id)
from pyspark import pipelines as dp
from pyspark.pipelines.graph_element_registry import graph_element_registration_context
with graph_element_registration_context(registry):
dp.create_streaming_table("demo_streaming_table")
You should see the following INFO message in the logs of the Spark Connect Server:
INFO PipelinesHandler: Define pipelines dataset cmd received: define_dataset {
dataflow_graph_id: "3cb66d5a-0621-4f15-9920-e99020e30e48"
dataset_name: "demo_streaming_table"
dataset_type: TABLE
}
Demo: spark-pipelines CLI¶
Activate Virtual Environment
Follow Demo: Create Virtual Environment for Python Client before getting started with this demo.
1️⃣ Display Pipelines Help¶
Run spark-pipelines --help to learn the options.
$SPARK_HOME/bin/spark-pipelines --help
usage: cli.py [-h] {run,dry-run,init} ...
Pipelines CLI
positional arguments:
{run,dry-run,init}
run Run a pipeline. If no refresh options specified, a
default incremental update is performed.
dry-run Launch a run that just validates the graph and checks
for errors.
init Generate a sample pipeline project, including a spec
file and example transformations.
options:
-h, --help show this help message and exit
2️⃣ Create Pipelines Demo Project¶
You've only created an empty Python project so far (using uv).
Create a demo double hello-spark-pipelines pipelines project with a sample spark-pipeline.yml and sample transformations (in Python and in SQL).
$SPARK_HOME/bin/spark-pipelines init --name hello-spark-pipelines && \
mv hello-spark-pipelines/* . && \
rm -rf hello-spark-pipelines
cat spark-pipeline.yml
name: hello-spark-pipelines
storage: file:///Users/jacek/sandbox/hello-spark-pipelines/hello-spark-pipelines/pipeline-storage
libraries:
- glob:
include: transformations/**
tree transformations
transformations
├── example_python_materialized_view.py
└── example_sql_materialized_view.sql
1 directory, 2 files
Spark Connect Server should be down
spark-pipelines dry-run starts its own Spark Connect Server at 15002 port (unless started with --remote option).
Shut down Spark Connect Server if you started it already.
$SPARK_HOME/sbin/stop-connect-server.sh
--remote option
Use --remote option to connect to a standalone Spark Connect Server.
$SPARK_HOME/bin/spark-pipelines --remote sc://localhost dry-run
3️⃣ Dry Run Pipelines Project¶
$SPARK_HOME/bin/spark-pipelines dry-run
Loading pipeline spec from /Users/jacek/sandbox/hello-spark-pipelines/spark-pipeline.yml...
Creating Spark session...
Creating dataflow graph...
Registering graph elements...
Loading definitions. Root directory: '/Users/jacek/sandbox/hello-spark-pipelines'.
Found 2 files matching glob 'transformations/**/*'
Importing /Users/jacek/sandbox/hello-spark-pipelines/transformations/example_python_materialized_view.py...
Registering SQL file /Users/jacek/sandbox/hello-spark-pipelines/transformations/example_sql_materialized_view.sql...
Starting run...
Run is COMPLETED.
4️⃣ Run Pipelines Project¶
Run the pipeline.
$SPARK_HOME/bin/spark-pipelines run
Loading pipeline spec from /Users/jacek/sandbox/hello-spark-pipelines/spark-pipeline.yml...
Creating Spark session...
Creating dataflow graph...
Registering graph elements...
Loading definitions. Root directory: '/Users/jacek/sandbox/hello-spark-pipelines'.
Found 2 files matching glob 'transformations/**/*'
Importing /Users/jacek/sandbox/hello-spark-pipelines/transformations/example_python_materialized_view.py...
Registering SQL file /Users/jacek/sandbox/hello-spark-pipelines/transformations/example_sql_materialized_view.sql...
Starting run...
Flow spark_catalog.default.example_python_materialized_view is QUEUED.
Flow spark_catalog.default.example_sql_materialized_view is QUEUED.
Flow spark_catalog.default.example_python_materialized_view is PLANNING.
Flow spark_catalog.default.example_python_materialized_view is STARTING.
Flow spark_catalog.default.example_python_materialized_view is RUNNING.
Flow spark_catalog.default.example_python_materialized_view has COMPLETED.
Flow spark_catalog.default.example_sql_materialized_view is PLANNING.
Flow spark_catalog.default.example_sql_materialized_view is STARTING.
Flow spark_catalog.default.example_sql_materialized_view is RUNNING.
Flow spark_catalog.default.example_sql_materialized_view has COMPLETED.
Run is COMPLETED.
tree spark-warehouse
spark-warehouse
├── example_python_materialized_view
│ ├── _SUCCESS
│ └── part-00000-284bc03a-3405-4e8e-bbd7-f6f17d79c282-c000.snappy.parquet
└── example_sql_materialized_view
├── _SUCCESS
└── part-00000-8316b6c6-7532-4f7a-92f6-2ec024e069f4-c000.snappy.parquet
3 directories, 4 files
Demo: Scala API¶
Step 1. Register Dataflow Graph¶
import org.apache.spark.sql.connect.pipelines.DataflowGraphRegistry
val graphId = DataflowGraphRegistry.createDataflowGraph(
defaultCatalog=spark.catalog.currentCatalog(),
defaultDatabase=spark.catalog.currentDatabase,
defaultSqlConf=Map.empty)
Step 2. Look Up Dataflow Graph¶
import org.apache.spark.sql.pipelines.graph.GraphRegistrationContext
val graphCtx: GraphRegistrationContext =
DataflowGraphRegistry.getDataflowGraphOrThrow(dataflowGraphId=graphId)
Step 3. Create DataflowGraph¶
import org.apache.spark.sql.pipelines.graph.DataflowGraph
val dp: DataflowGraph = graphCtx.toDataflowGraph
Step 4. Create Update Context¶
import org.apache.spark.sql.pipelines.graph.{ PipelineUpdateContext, PipelineUpdateContextImpl }
import org.apache.spark.sql.pipelines.logging.PipelineEvent
val swallowEventsCallback: PipelineEvent => Unit = _ => ()
val updateCtx: PipelineUpdateContext =
new PipelineUpdateContextImpl(unresolvedGraph=dp, eventCallback=swallowEventsCallback)
Step 5. Start Pipeline¶
updateCtx.pipelineExecution.runPipeline()