WriteIntoDelta Command¶
WriteIntoDelta is a delta command that can write data(frame) out transactionally into a delta table (that can be already available or is about to be created).
WriteIntoDelta is a LeafRunnableCommand (Spark SQL) logical operator.
Creating Instance¶
WriteIntoDelta takes the following to be created:
- DeltaLog
-
SaveMode - DeltaOptions
- Names of the partition columns
- Configuration
- Data (
DataFrame) - schemaInCatalog
WriteIntoDelta is created when:
- DeltaDynamicPartitionOverwriteCommand is executed
DeltaLogis requested to create an insertable HadoopFsRelation (whenDeltaDataSourceis requested to create a relation as a CreatableRelationProvider or a RelationProvider)DeltaCatalogis requested to create a delta tableWriteIntoDeltaBuilderis requested to build a V1Write- CreateDeltaTableCommand is executed
DeltaDataSourceis requested to create a relation (for writing) (as a CreatableRelationProvider)
Configuration¶
WriteIntoDelta is given a configuration when created as follows:
- Always empty for DeltaLog
- Always empty for DeltaDataSource
- Existing properties of a delta table in DeltaCatalog (with the
commentkey based on the value in the catalog) - Existing configuration (of the Metadata of the Snapshot of the DeltaLog) for WriteIntoDeltaBuilder
- Existing properties of a delta table for CreateDeltaTableCommand (with the
commentkey based on the value in the catalog)
schemaInCatalog¶
schemaInCatalog: Option[StructType] = None
WriteIntoDelta can be given a StructType (Spark SQL) when created. Unless given, it is assumed undefined.
schemaInCatalog is only defined when DeltaCatalog is requested to create a delta table (yet it does not seem to be used at all).
ImplicitMetadataOperation¶
WriteIntoDelta is an operation that can update metadata (schema and partitioning) of the delta table.
Executing Command¶
RunnableCommand
run(
sparkSession: SparkSession): Seq[Row]
run is part of the RunnableCommand (Spark SQL) abstraction.
run requests the DeltaLog to start a new transaction.
run writes and requests the OptimisticTransaction to commit (with DeltaOperations.Write operation with the SaveMode, partition columns, replaceWhere and userMetadata).
Writing Out Data¶
write(
txn: OptimisticTransaction,
sparkSession: SparkSession): Seq[Action]
write checks out whether the write operation is to a delta table that already exists. If so (i.e. the readVersion of the transaction is above -1), write branches per the SaveMode:
-
For
ErrorIfExists,writethrows anAnalysisException.[path] already exists. -
For
Ignore,writedoes nothing and returns back with no Actions. -
For
Overwrite,writerequests the DeltaLog to assert being removable
write updateMetadata (with rearrangeOnly option).
write...FIXME
write is used the following commands are executed:
extractConstraints¶
extractConstraints(
sparkSession: SparkSession,
expr: Seq[Expression]): Seq[Constraint]
For every Expression (in the given expr), extractConstraints checks out whether there is an UnresolvedAttribute. If there is one, extractConstraints creates a Check constraint with the following:
| Property | Value |
|---|---|
name | EXPRESSION(expression) |
expression | The Expression being handled |
Noop with spark.databricks.delta.replaceWhere.constraintCheck.enabled disabled
extractConstraints returns no Constraints for spark.databricks.delta.replaceWhere.constraintCheck.enabled disabled.
removeFiles¶
removeFiles(
spark: SparkSession,
txn: OptimisticTransaction,
condition: Seq[Expression]): Seq[Action]
removeFiles...FIXME
Demo¶
import org.apache.spark.sql.delta.commands.WriteIntoDelta
import org.apache.spark.sql.delta.DeltaLog
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.delta.DeltaOptions
val tableName = "/tmp/delta/t1"
val data = spark.range(5).toDF
val writeCmd = WriteIntoDelta(
deltaLog = DeltaLog.forTable(spark, tableName),
mode = SaveMode.Overwrite,
options = new DeltaOptions(Map.empty[String, String], spark.sessionState.conf),
partitionColumns = Seq.empty[String],
configuration = Map.empty[String, String],
data)
// Review web UI @ http://localhost:4040
writeCmd.run(spark)
canOverwriteSchema¶
ImplicitMetadataOperation
canOverwriteSchema: Boolean
canOverwriteSchema is part of the ImplicitMetadataOperation abstraction.
canOverwriteSchema is true when all the following hold:
- canOverwriteSchema is enabled (
true) (in the DeltaOptions) - This
WriteIntoDeltais overwrite operation - replaceWhere option is not defined (in the DeltaOptions)
isOverwriteOperation¶
isOverwriteOperation: Boolean
isOverwriteOperation is true for the SaveMode to be SaveMode.Overwrite.
isOverwriteOperation is used when:
WriteIntoDeltais requested for the canOverwriteSchema and to write
writeAndReturnCommitData¶
WriteIntoDeltaLike
writeAndReturnCommitData(
txn: OptimisticTransaction,
sparkSession: SparkSession,
clusterBySpecOpt: Option[ClusterBySpec] = None,
isTableReplace: Boolean = false): TaggedCommitData[Action]
writeAndReturnCommitData is part of the WriteIntoDeltaLike abstraction.
writeAndReturnCommitData...FIXME
writeFiles¶
writeFiles(
txn: OptimisticTransaction,
data: DataFrame,
options: DeltaOptions): Seq[FileAction]
writeFiles...FIXME