WriteIntoDelta Command¶
WriteIntoDelta
is a delta command that can write data(frame) out transactionally into a delta table (that can be already available or is about to be created).
WriteIntoDelta
is a LeafRunnableCommand
(Spark SQL) logical operator.
Creating Instance¶
WriteIntoDelta
takes the following to be created:
- DeltaLog
-
SaveMode
- DeltaOptions
- Names of the partition columns
- Configuration
- Data (
DataFrame
) - schemaInCatalog
WriteIntoDelta
is created when:
- DeltaDynamicPartitionOverwriteCommand is executed
DeltaLog
is requested to create an insertable HadoopFsRelation (whenDeltaDataSource
is requested to create a relation as a CreatableRelationProvider or a RelationProvider)DeltaCatalog
is requested to create a delta tableWriteIntoDeltaBuilder
is requested to build a V1Write- CreateDeltaTableCommand is executed
DeltaDataSource
is requested to create a relation (for writing) (as a CreatableRelationProvider)
Configuration¶
WriteIntoDelta
is given a configuration
when created as follows:
- Always empty for DeltaLog
- Always empty for DeltaDataSource
- Existing properties of a delta table in DeltaCatalog (with the
comment
key based on the value in the catalog) - Existing configuration (of the Metadata of the Snapshot of the DeltaLog) for WriteIntoDeltaBuilder
- Existing properties of a delta table for CreateDeltaTableCommand (with the
comment
key based on the value in the catalog)
schemaInCatalog¶
schemaInCatalog: Option[StructType] = None
WriteIntoDelta
can be given a StructType
(Spark SQL) when created. Unless given, it is assumed undefined.
schemaInCatalog
is only defined when DeltaCatalog
is requested to create a delta table (yet it does not seem to be used at all).
ImplicitMetadataOperation¶
WriteIntoDelta
is an operation that can update metadata (schema and partitioning) of the delta table.
Executing Command¶
RunnableCommand
run(
sparkSession: SparkSession): Seq[Row]
run
is part of the RunnableCommand
(Spark SQL) abstraction.
run
requests the DeltaLog to start a new transaction.
run
writes and requests the OptimisticTransaction
to commit (with DeltaOperations.Write
operation with the SaveMode, partition columns, replaceWhere and userMetadata).
Writing Out Data¶
write(
txn: OptimisticTransaction,
sparkSession: SparkSession): Seq[Action]
write
checks out whether the write operation is to a delta table that already exists. If so (i.e. the readVersion of the transaction is above -1
), write
branches per the SaveMode:
-
For
ErrorIfExists
,write
throws anAnalysisException
.[path] already exists.
-
For
Ignore
,write
does nothing and returns back with no Actions. -
For
Overwrite
,write
requests the DeltaLog to assert being removable
write
updateMetadata (with rearrangeOnly option).
write
...FIXME
write
is used the following commands are executed:
extractConstraints¶
extractConstraints(
sparkSession: SparkSession,
expr: Seq[Expression]): Seq[Constraint]
For every Expression
(in the given expr
), extractConstraints
checks out whether there is an UnresolvedAttribute
. If there is one, extractConstraints
creates a Check constraint with the following:
Property | Value |
---|---|
name | EXPRESSION(expression) |
expression | The Expression being handled |
Noop with spark.databricks.delta.replaceWhere.constraintCheck.enabled disabled
extractConstraints
returns no Constraints for spark.databricks.delta.replaceWhere.constraintCheck.enabled disabled.
removeFiles¶
removeFiles(
spark: SparkSession,
txn: OptimisticTransaction,
condition: Seq[Expression]): Seq[Action]
removeFiles
...FIXME
Demo¶
import org.apache.spark.sql.delta.commands.WriteIntoDelta
import org.apache.spark.sql.delta.DeltaLog
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.delta.DeltaOptions
val tableName = "/tmp/delta/t1"
val data = spark.range(5).toDF
val writeCmd = WriteIntoDelta(
deltaLog = DeltaLog.forTable(spark, tableName),
mode = SaveMode.Overwrite,
options = new DeltaOptions(Map.empty[String, String], spark.sessionState.conf),
partitionColumns = Seq.empty[String],
configuration = Map.empty[String, String],
data)
// Review web UI @ http://localhost:4040
writeCmd.run(spark)
canOverwriteSchema¶
ImplicitMetadataOperation
canOverwriteSchema: Boolean
canOverwriteSchema
is part of the ImplicitMetadataOperation abstraction.
canOverwriteSchema
is true
when all the following hold:
- canOverwriteSchema is enabled (
true
) (in the DeltaOptions) - This
WriteIntoDelta
is overwrite operation - replaceWhere option is not defined (in the DeltaOptions)
isOverwriteOperation¶
isOverwriteOperation: Boolean
isOverwriteOperation
is true
for the SaveMode to be SaveMode.Overwrite
.
isOverwriteOperation
is used when:
WriteIntoDelta
is requested for the canOverwriteSchema and to write
writeAndReturnCommitData¶
WriteIntoDeltaLike
writeAndReturnCommitData(
txn: OptimisticTransaction,
sparkSession: SparkSession,
clusterBySpecOpt: Option[ClusterBySpec] = None,
isTableReplace: Boolean = false): TaggedCommitData[Action]
writeAndReturnCommitData
is part of the WriteIntoDeltaLike abstraction.
writeAndReturnCommitData
...FIXME
writeFiles¶
writeFiles(
txn: OptimisticTransaction,
data: DataFrame,
options: DeltaOptions): Seq[FileAction]
writeFiles
...FIXME