Skip to content


DataFrameWriter[T] is a high-level API for Spark SQL developers to describe "write path" of a structured query (over rows of T type).

DataFrameWriter is used to describe an output node in a data processing graph.

DataFrameWriter is used to describe the output data source format to be used to "save" data to a data source (e.g. files, Hive tables, JDBC or Dataset[String]).

DataFrameWriter ends description of a write specification and does trigger a Spark job (unlike DataFrameWriter).

DataFrameWriter is available using Dataset.write operator.

Creating Instance

DataFrameWriter takes the following to be created:



val writer = df.write

import org.apache.spark.sql.DataFrameWriter


When created, DataFrameWriter converts the Dataset to a DataFrame.

Name of Data Source

source: String

source is a short name (alias) or a fully-qualified class name to identify the data source to write data to.

source can be specified using format method:

  source: String): DataFrameWriter[T]

Default: spark.sql.sources.default configuration property


  tableName: String): Unit

insertInto requests the DataFrame for the SparkSession.

insertInto tries to look up the TableProvider for the data source.

insertInto requests the ParserInterface to parse the tableName identifier (possibly multi-part).

In the end, insertInto uses the modern or the legacy insert paths based on...FIXME

DataFrameWrite.insertInto Executes SQL Command (as a Spark job)

insertInto asserts that write is not bucketed (with insertInto operation name).


saveAsTable and insertInto are structurally alike.

Modern Insert Path (CatalogPlugin)

  catalog: CatalogPlugin,
  ident: Identifier): Unit


Legacy Insert Path (TableIdentifier)

  tableIdent: TableIdentifier): Unit

insertInto creates an InsertIntoStatement logical operator (with overwrite flag enabled when SaveMode is Overwrite).

In the end, insertInto executes the InsertIntoStatement logical command.


insertInto throws an AnalysisException when the partitioningColumns are defined:

insertInto() can't be used together with partitionBy(). Partition columns have already been defined for the table. It is not necessary to use partitionBy().


  tableName: String): Unit

saveAsTable requests the DataFrame for the SparkSession.

saveAsTable tries to look up the TableProvider for the data source.

saveAsTable requests the ParserInterface to parse the tableName identifier (possibly multi-part).

In the end, saveAsTable uses the modern or the legacy save paths based on...FIXME


saveAsTable and insertInto are structurally alike.

Modern saveAsTable with TableCatalog

  catalog: TableCatalog,
  ident: Identifier,
  nameParts: Seq[String]): Unit

Legacy saveAsTable with TableIdentifier

  tableIdent: TableIdentifier): Unit

saveAsTable saves the content of a DataFrame to the tableName table.


saveAsTable throws an AnalysisException when no catalog could handle the table identifier:

Couldn't find a catalog to handle the identifier [tableName].


val ids = spark.range(5)
  option("path", "/tmp/five_ids").

// Check out if saveAsTable as five_ids was successful
val q = spark.catalog.listTables.filter($"name" === "five_ids")
|    name|database|description|tableType|isTemporary|
|five_ids| default|       null| EXTERNAL|      false|

Writing Out Data (save)

save(): Unit
  path: String): Unit

Saves a DataFrame (the result of executing a structured query) to a data source.

Internally, save uses DataSource to look up the class of the requested data source (for the source option and the SQLConf).


save uses SparkSession to access the SessionState and in turn the SQLConf.

val df: DataFrame = ???


save throws an AnalysisException when requested to save to Hive data source (the source is hive):

Hive data source can only be used with tables, you can not write files of Hive data source directly.

save throws an AnalysisException when bucketing is used (the numBuckets or sortColumnNames options are defined):

'[operation]' does not support bucketing right now


  path: Option[String]): Unit


Looking up TableProvider

lookupV2Provider(): Option[TableProvider]

lookupV2Provider tries to look up a TableProvider for the source.

lookupV2Provider explicitly excludes FileDataSourceV2-based data sources (due to SPARK-28396).

lookupV2Provider is used when:

Save Mode

  saveMode: SaveMode): DataFrameWriter[T]
  saveMode: String): DataFrameWriter[T]

mode defines the behaviour of save when an external file or table Spark writes to already exists.

Name Behaviour
Append Records are appended to an existing data
ErrorIfExists Exception is thrown if the target exists
Ignore Do not save the records and not change the existing data in any way
Overwrite Existing data is overwritten by new records

Creating BucketSpec

getBucketSpec: Option[BucketSpec]

getBucketSpec creates a new BucketSpec for numBuckets if defined (with bucketColumnNames and sortColumnNames).


getBucketSpec throws an IllegalArgumentException when numBuckets are not defined but sortColumnNames are.

sortBy must be used together with bucketBy

getBucketSpec is used when:


partitioningAsV2: Seq[Transform]

partitioningAsV2 creates Transforms based on the partitioningColumns (IdentityTransforms) and getBucketSpec (a BucketTransform), if defined.

partitioningAsV2 is used when:

Executing Logical Command for Writing to Data Source V1

saveToV1Source(): Unit

saveToV1Source creates a DataSource (for the source class name, the partitioningColumns and the extraOptions) and requests it for the logical command for writing (with the mode and the analyzed logical plan of the structured query).


While requesting the analyzed logical plan of the structured query, saveToV1Source triggers execution of logical commands.

In the end, saveToV1Source runs the logical command for writing.

saveToV1Source is used when:

Executing Logical Command(s)

  session: SparkSession,
  name: String)(
  command: LogicalPlan): Unit

runCommand uses the given SparkSession to access the SessionState that is in turn requested to execute the logical command (that creates a QueryExecution).

runCommand records the current time (start time) and uses the SQLExecution helper object to execute the action (under a new execution id) that simply requests the QueryExecution for the RDD[InternalRow] (and triggers execution of logical commands).


Use web UI's SQL tab to see the execution or a SparkListener to be notified when the execution is started and finished. The SparkListener should intercept SparkListenerSQLExecutionStart and SparkListenerSQLExecutionEnd events.

runCommand records the current time (end time).

In the end, runCommand uses the input SparkSession to access the ExecutionListenerManager and requests it to onSuccess (with the input name, the QueryExecution and the duration).

In case of any exceptions, runCommand requests the ExecutionListenerManager to onFailure (with the exception) and (re)throws it.

runCommand is used when:

Creating Table

  tableIdent: TableIdentifier): Unit

createTable builds a CatalogStorageFormat per extraOptions.

createTable assumes the table being external when location URI of CatalogStorageFormat is defined, and managed otherwise.

createTable creates a CatalogTable (with the bucketSpec per getBucketSpec).

In the end, createTable creates a CreateTable logical command (with the CatalogTable, mode and the logical query plan of the dataset) and runs it.

createTable is used when: