WriteIntoDelta Command — Writing Data(Frame) Transactionally Into Delta Table

WriteIntoDelta is a Delta command that can write data(frame) transactionally into a delta table.

Demo
import org.apache.spark.sql.delta.commands.WriteIntoDelta
import org.apache.spark.sql.delta.DeltaLog
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.delta.DeltaOptions
val tableName = "/tmp/delta/t1"
val data = spark.range(5).toDF
val writeCmd = WriteIntoDelta(
  deltaLog = DeltaLog.forTable(spark, tableName),
  mode = SaveMode.Overwrite,
  options = new DeltaOptions(Map.empty[String, String], spark.sessionState.conf),
  partitionColumns = Seq.empty[String],
  configuration = Map.empty[String, String],
  data)

// Review web UI @ http://localhost:4040

writeCmd.run(spark)

WriteIntoDelta is a logical command (RunnableCommand).

Read up on RunnableCommand in The Internals of Spark SQL online book.

WriteIntoDelta is created when:

Creating WriteIntoDelta Instance

WriteIntoDelta takes the following to be created:

  • DeltaLog

  • SaveMode

  • DeltaOptions

  • Names of the partition columns (Seq[String])

  • Configuration (Map[String, String])

  • Data (DataFrame)

Running Command — run Method

run(
  sparkSession: SparkSession): Seq[Row]
run is part of the RunnableCommand contract to run a command.

run requests the DeltaLog to start a new transaction.

run writes and requests the OptimisticTransaction to commit.

write Method

write(
  txn: OptimisticTransaction,
  sparkSession: SparkSession): Seq[Action]

write checks out whether the write operation is to a delta table that already exists. If so (i.e. the readVersion of the transaction is above -1), write branches per the SaveMode:

  • For ErrorIfExists, write throws an AnalysisException:

    [path] already exists.
  • For Ignore, write does nothing

  • For Overwrite, write requests the DeltaLog to assertRemovable

write…​FIXME

write is used exclusively when WriteIntoDelta is requested to run.