CreateDeltaTableCommand¶
CreateDeltaTableCommand is a LeafRunnableCommand (Spark SQL) to create a delta table (for DeltaCatalog).
CreateDeltaTableCommand represents the following SQL commands at execution for delta tables:
CREATE TABLE ... LIKE(Spark SQL)- SHALLOW CLONE
CreateDeltaTableCommand is a DeltaCommand.
CreateDeltaTableCommand is a CreateDeltaTableLike command.
Creating Instance¶
CreateDeltaTableCommand takes the following to be created:
-
CatalogTable(Spark SQL) - Existing
CatalogTable(if available) -
SaveMode - Logical Query Plan
- Create Operation
-
tableByPathflag (default:false) - Output attributes
- Protocol (optional)
- allowCatalogManaged flag
- Create Table Function (default: undefined)
CreateDeltaTableCommand is created when:
- DeltaAnalysis logical resolution rule is executed for the following:
CreateTableLikeCommand(with the delta table as the source or the provider beingdelta)- CloneTableStatement
AbstractDeltaCatalogis requested to create a delta table
allowCatalogManaged Flag¶
CreateDeltaTableLike
allowCatalogManaged is part of the CreateDeltaTableLike abstraction.
CreateDeltaTableCommand can be given allowCatalogManaged flag when created.
allowCatalogManaged is disabled (false) by default.
allowCatalogManaged can be enabled (true) only when AbstractDeltaCatalog is requested to create a delta table with Unity Catalog configured.
Optional Logical Query Plan¶
CreateDeltaTableCommand can be given a LogicalPlan (Spark SQL) when created.
The LogicalPlan can be one of the following (that triggers a custom handling):
| Logical Query Plan | Handler |
|---|---|
| CloneTableCommand | handleClone |
| WriteIntoDeltaLike | handleCreateTableAsSelect |
Some other LogicalPlan | handleCreateTableAsSelect |
| Undefined | handleCreateTable |
Create Operation¶
CreateDeltaTableCommand can be given a CreationMode when created:
Create(default)CreateOrReplaceReplace
CreationMode is Create by default or specified by DeltaCatalog.
Execute Command¶
RunnableCommand
run is part of the RunnableCommand (Spark SQL) abstraction.
run...FIXME
DELTA_UNSUPPORTED_CATALOG_MANAGED_TABLE_CREATION Error
run checks whether it is allowed to create a catalog-managed table using the following:
- this allowCatalogManaged flag is enabled and CatalogOwnedTableFeature is among the supported table features (of this CatalogTable).
- spark.databricks.delta.properties.defaults.feature.catalogManaged configuration property is enabled globally.
If neither is positive, run throws a DeltaUnsupportedOperationException:
updateCatalog¶
updateCatalog uses the given SparkSession to access SessionCatalog to createTable or alterTable when the tableByPath flag is off. Otherwise, updateCatalog does nothing.
getOperation¶
getOperation(
metadata: Metadata,
isManagedTable: Boolean,
options: Option[DeltaOptions]): DeltaOperations.Operation
getOperation...FIXME
replaceMetadataIfNecessary¶
replaceMetadataIfNecessary(
txn: OptimisticTransaction,
tableDesc: CatalogTable,
options: DeltaOptions,
schema: StructType): Unit
tableDesc Unused
tableDesc argument is not used.
replaceMetadataIfNecessary determines whether or not it is a replace operation (i.e., CreateOrReplace or Replace based on the CreationMode).
replaceMetadataIfNecessary determines whether or not it is supposed not to overwrite the schema of a Delta table (based on the overwriteSchema option in the input DeltaOptions).
In the end, only for an CreateOrReplace or Replace operation on an existing delta table with overwriteSchema option enabled, replaceMetadataIfNecessary updates the metadata (on the given OptimisticTransaction) with the given schema.
DeltaIllegalArgumentException¶
replaceMetadataIfNecessary throws an DeltaIllegalArgumentException for a CreateOrReplace or Replace operation with overwriteSchema option enabled:
Handling Transaction Commit¶
handleCommit(
sparkSession: SparkSession,
deltaLog: DeltaLog,
tableWithLocation: CatalogTable): Seq[Row]
handleCommit starts a transaction.
handleCommit executes one of the following logic to handle the query (that gives the result to be returned):
- CloneTableCommand
- WriteIntoDeltaLike
- Some other query (that is neither aCloneTableCommand nor a WriteIntoDeltaLike)
- No query
In the end, handleCommit runs post-commit updates.
CloneTableCommand¶
handleCommit checkPathEmpty.
handleCommit requests the CloneTableCommand to handleClone.
WriteIntoDeltaLike¶
handleCommit checkPathEmpty.
handleCommit handleCreateTableAsSelect.
Some Other Query¶
handleCommit checkPathEmpty.
handleCommit makes sure that the query is not a RunnableCommand (Spark SQL) or throws an IllegalArgumentException.
handleCommit handleCreateTableAsSelect with a new WriteIntoDelta.
No Query¶
With no query specified, handleCommit handleCreateTable
Executing Post-Commit Updates¶
runPostCommitUpdates(
sparkSession: SparkSession,
txnUsedForCommit: OptimisticTransaction,
deltaLog: DeltaLog,
tableWithLocation: CatalogTable): Unit
Procedure
runPostCommitUpdates is a procedure (returns Unit) so what happens inside stays inside (paraphrasing the former advertising slogan of Las Vegas, Nevada).
runPostCommitUpdates prints out the following INFO message to the logs:
runPostCommitUpdates requests the given DeltaLog to update.
runPostCommitUpdates updates the catalog.
In the end, when delta.universalFormat.enabledFormats table property contains iceberg, runPostCommitUpdates requests the UniversalFormatConverter to convertSnapshot.
Updating Table Catalog¶
updateCatalog(
spark: SparkSession,
table: CatalogTable,
snapshot: Snapshot,
didNotChangeMetadata: Boolean): Unit
Procedure
updateCatalog is a procedure (returns Unit) so what happens inside stays inside (paraphrasing the former advertising slogan of Las Vegas, Nevada).
didNotChangeMetadata Not Used
updateCatalog prints out the following INFO message to the logs:
updateCatalog requests the given DeltaLog to update.
updateCatalog updates the catalog.
In the end, when delta.universalFormat.enabledFormats table property contains iceberg, updateCatalog requests the UniversalFormatConverter to convertSnapshot.
handleCreateTable¶
handleCreateTable(
sparkSession: SparkSession,
txn: OptimisticTransaction,
tableWithLocation: CatalogTable,
fs: FileSystem,
hadoopConf: Configuration): Unit
handleCreateTable...FIXME
handleCreateTableAsSelect¶
handleCreateTableAsSelect(
sparkSession: SparkSession,
txn: OptimisticTransaction,
deltaLog: DeltaLog,
deltaWriter: WriteIntoDeltaLike,
tableWithLocation: CatalogTable): Unit
handleCreateTableAsSelect...FIXME
Provided Metadata¶
getProvidedMetadata gives a new Metadata with the values copied directly from the given CatalogTable (Spark SQL).
This Metadata has got clustering columns property removed from the table properties and uses the given schema (schemaString).
getProvidedMetadata creates a Metadata with the following:
| Metadata | Value |
|---|---|
| Description | The comment of the given CatalogTable (Spark SQL), if defined |
| Schema | The given schemaString |
| Partition Columns | The partitionColumnNames of the given CatalogTable (Spark SQL) |
| Table Configuration | clustering columns property removed from the properties of the given CatalogTable (Spark SQL) |
| Created Time | The current time |
getProvidedMetadata is used when:
CreateDeltaTableCommandis requested to handleCreateTable and replaceMetadataIfNecessary
Logging¶
Enable ALL logging level for org.apache.spark.sql.delta.commands.CreateDeltaTableCommand logger to see what happens inside.
Add the following line to conf/log4j2.properties:
logger.CreateDeltaTableCommand.name = org.apache.spark.sql.delta.commands.CreateDeltaTableCommand
logger.CreateDeltaTableCommand.level = all
Refer to Logging.