Skip to content

WriteIntoDelta Command

WriteIntoDelta is a delta command that can write data(frame) out transactionally into a delta table (that can be already available or is about to be created).

WriteIntoDelta is a LeafRunnableCommand (Spark SQL) logical operator.

Creating Instance

WriteIntoDelta takes the following to be created:

WriteIntoDelta is created when:

Configuration

WriteIntoDelta is given a configuration when created as follows:

schemaInCatalog

schemaInCatalog: Option[StructType] = None

WriteIntoDelta can be given a StructType (Spark SQL) when created. Unless given, it is assumed undefined.

schemaInCatalog is only defined when DeltaCatalog is requested to create a delta table (yet it does not seem to be used at all).

ImplicitMetadataOperation

WriteIntoDelta is an operation that can update metadata (schema and partitioning) of the delta table.

Executing Command

RunnableCommand
run(
  sparkSession: SparkSession): Seq[Row]

run is part of the RunnableCommand (Spark SQL) abstraction.

run requests the DeltaLog to start a new transaction.

run writes and requests the OptimisticTransaction to commit (with DeltaOperations.Write operation with the SaveMode, partition columns, replaceWhere and userMetadata).

Writing Out Data

write(
  txn: OptimisticTransaction,
  sparkSession: SparkSession): Seq[Action]

write checks out whether the write operation is to a delta table that already exists. If so (i.e. the readVersion of the transaction is above -1), write branches per the SaveMode:

  • For ErrorIfExists, write throws an AnalysisException.

    [path] already exists.
    
  • For Ignore, write does nothing and returns back with no Actions.

  • For Overwrite, write requests the DeltaLog to assert being removable

write updateMetadata (with rearrangeOnly option).

write...FIXME


write is used the following commands are executed:

extractConstraints

extractConstraints(
  sparkSession: SparkSession,
  expr: Seq[Expression]): Seq[Constraint]

For every Expression (in the given expr), extractConstraints checks out whether there is an UnresolvedAttribute. If there is one, extractConstraints creates a Check constraint with the following:

Property Value
name EXPRESSION(expression)
expression The Expression being handled
Noop with spark.databricks.delta.replaceWhere.constraintCheck.enabled disabled

extractConstraints returns no Constraints for spark.databricks.delta.replaceWhere.constraintCheck.enabled disabled.

removeFiles

removeFiles(
  spark: SparkSession,
  txn: OptimisticTransaction,
  condition: Seq[Expression]): Seq[Action]

removeFiles...FIXME

Demo

import org.apache.spark.sql.delta.commands.WriteIntoDelta
import org.apache.spark.sql.delta.DeltaLog
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.delta.DeltaOptions
val tableName = "/tmp/delta/t1"
val data = spark.range(5).toDF
val writeCmd = WriteIntoDelta(
  deltaLog = DeltaLog.forTable(spark, tableName),
  mode = SaveMode.Overwrite,
  options = new DeltaOptions(Map.empty[String, String], spark.sessionState.conf),
  partitionColumns = Seq.empty[String],
  configuration = Map.empty[String, String],
  data)

// Review web UI @ http://localhost:4040

writeCmd.run(spark)

canOverwriteSchema

ImplicitMetadataOperation
canOverwriteSchema: Boolean

canOverwriteSchema is part of the ImplicitMetadataOperation abstraction.

canOverwriteSchema is true when all the following hold:

  1. canOverwriteSchema is enabled (true) (in the DeltaOptions)
  2. This WriteIntoDelta is overwrite operation
  3. replaceWhere option is not defined (in the DeltaOptions)

isOverwriteOperation

isOverwriteOperation: Boolean

isOverwriteOperation is true for the SaveMode to be SaveMode.Overwrite.


isOverwriteOperation is used when:

writeAndReturnCommitData

WriteIntoDeltaLike
writeAndReturnCommitData(
  txn: OptimisticTransaction,
  sparkSession: SparkSession,
  clusterBySpecOpt: Option[ClusterBySpec] = None,
  isTableReplace: Boolean = false): TaggedCommitData[Action]

writeAndReturnCommitData is part of the WriteIntoDeltaLike abstraction.

writeAndReturnCommitData...FIXME

writeFiles

writeFiles(
  txn: OptimisticTransaction,
  data: DataFrame,
  options: DeltaOptions): Seq[FileAction]

writeFiles...FIXME