CreateDeltaTableCommand¶
CreateDeltaTableCommand is a LeafRunnableCommand (Spark SQL) to create a delta table (for DeltaCatalog).
CreateDeltaTableCommand is a DeltaCommand.
Creating Instance¶
CreateDeltaTableCommand takes the following to be created:
-
CatalogTable(Spark SQL) - Existing
CatalogTable(if available) -
SaveMode - Logical Query Plan
- CreationMode
-
tableByPathflag (default:false) - Output attributes
- Optional Protocol
CreateDeltaTableCommand is created when:
- DeltaAnalysis logical resolution rule is executed for the following:
CreateTableLikeCommand(with the delta table as the source or the provider beingdelta)- CloneTableStatement
DeltaCatalogis requested to create a delta table
Optional Logical Query Plan¶
query: Option[LogicalPlan]
CreateDeltaTableCommand can be given a LogicalPlan (Spark SQL) when created.
The LogicalPlan can be one of the following (that triggers a custom handling):
| Logical Query Plan | Handler |
|---|---|
| CloneTableCommand | handleClone |
| WriteIntoDeltaLike | handleCreateTableAsSelect |
Some other LogicalPlan | handleCreateTableAsSelect |
| Undefined | handleCreateTable |
CreationMode¶
CreateDeltaTableCommand can be given a CreationMode when created:
Create(default)CreateOrReplaceReplace
CreationMode is Create by default or specified by DeltaCatalog.
Executing Command¶
RunnableCommand
run(
sparkSession: SparkSession): Seq[Row]
run is part of the RunnableCommand (Spark SQL) abstraction.
run...FIXME
updateCatalog¶
updateCatalog(
spark: SparkSession,
table: CatalogTable): Unit
updateCatalog uses the given SparkSession to access SessionCatalog to createTable or alterTable when the tableByPath flag is off. Otherwise, updateCatalog does nothing.
getOperation¶
getOperation(
metadata: Metadata,
isManagedTable: Boolean,
options: Option[DeltaOptions]): DeltaOperations.Operation
getOperation...FIXME
replaceMetadataIfNecessary¶
replaceMetadataIfNecessary(
txn: OptimisticTransaction,
tableDesc: CatalogTable,
options: DeltaOptions,
schema: StructType): Unit
tableDesc Unused
tableDesc argument is not used.
replaceMetadataIfNecessary determines whether or not it is a replace operation (i.e., CreateOrReplace or Replace based on the CreationMode).
replaceMetadataIfNecessary determines whether or not it is supposed not to overwrite the schema of a Delta table (based on the overwriteSchema option in the input DeltaOptions).
In the end, only for an CreateOrReplace or Replace operation on an existing delta table with overwriteSchema option enabled, replaceMetadataIfNecessary updates the metadata (on the given OptimisticTransaction) with the given schema.
DeltaIllegalArgumentException¶
replaceMetadataIfNecessary throws an DeltaIllegalArgumentException for a CreateOrReplace or Replace operation with overwriteSchema option enabled:
The usage of overwriteSchema is not allowed when replacing a Delta table.
Handling Transaction Commit¶
handleCommit(
sparkSession: SparkSession,
deltaLog: DeltaLog,
tableWithLocation: CatalogTable): Seq[Row]
handleCommit starts a transaction.
handleCommit executes one of the following logic to handle the query (that gives the result to be returned):
- CloneTableCommand
- WriteIntoDeltaLike
- Some other query (that is neither aCloneTableCommand nor a WriteIntoDeltaLike)
- No query
In the end, handleCommit runs post-commit updates.
CloneTableCommand¶
handleCommit checkPathEmpty.
handleCommit requests the CloneTableCommand to handleClone.
WriteIntoDeltaLike¶
handleCommit checkPathEmpty.
handleCommit handleCreateTableAsSelect.
Some Other Query¶
handleCommit checkPathEmpty.
handleCommit makes sure that the query is not a RunnableCommand (Spark SQL) or throws an IllegalArgumentException.
handleCommit handleCreateTableAsSelect with a new WriteIntoDelta.
No Query¶
With no query specified, handleCommit handleCreateTable
Executing Post-Commit Updates¶
runPostCommitUpdates(
sparkSession: SparkSession,
txnUsedForCommit: OptimisticTransaction,
deltaLog: DeltaLog,
tableWithLocation: CatalogTable): Unit
Procedure
runPostCommitUpdates is a procedure (returns Unit) so what happens inside stays inside (paraphrasing the former advertising slogan of Las Vegas, Nevada).
runPostCommitUpdates prints out the following INFO message to the logs:
Table is path-based table: [tableByPath]. Update catalog with mode: [operation]
runPostCommitUpdates requests the given DeltaLog to update.
runPostCommitUpdates updates the catalog.
In the end, when delta.universalFormat.enabledFormats table property contains iceberg, runPostCommitUpdates requests the UniversalFormatConverter to convertSnapshot.
Updating Table Catalog¶
updateCatalog(
spark: SparkSession,
table: CatalogTable,
snapshot: Snapshot,
didNotChangeMetadata: Boolean): Unit
Procedure
updateCatalog is a procedure (returns Unit) so what happens inside stays inside (paraphrasing the former advertising slogan of Las Vegas, Nevada).
didNotChangeMetadata Not Used
updateCatalog prints out the following INFO message to the logs:
Table is path-based table: [tableByPath]. Update catalog with mode: [operation]
updateCatalog requests the given DeltaLog to update.
updateCatalog updates the catalog.
In the end, when delta.universalFormat.enabledFormats table property contains iceberg, updateCatalog requests the UniversalFormatConverter to convertSnapshot.
handleCreateTable¶
handleCreateTable(
sparkSession: SparkSession,
txn: OptimisticTransaction,
tableWithLocation: CatalogTable,
fs: FileSystem,
hadoopConf: Configuration): Unit
handleCreateTable...FIXME
handleCreateTableAsSelect¶
handleCreateTableAsSelect(
sparkSession: SparkSession,
txn: OptimisticTransaction,
deltaLog: DeltaLog,
deltaWriter: WriteIntoDeltaLike,
tableWithLocation: CatalogTable): Unit
handleCreateTableAsSelect...FIXME
Provided Metadata¶
getProvidedMetadata(
table: CatalogTable,
schemaString: String): Metadata
getProvidedMetadata gives a new Metadata with the values copied directly from the given CatalogTable (Spark SQL).
This Metadata has got clustering columns property removed from the table properties and uses the given schema (schemaString).
getProvidedMetadata creates a Metadata with the following:
| Metadata | Value |
|---|---|
| Description | The comment of the given CatalogTable (Spark SQL), if defined |
| Schema | The given schemaString |
| Partition Columns | The partitionColumnNames of the given CatalogTable (Spark SQL) |
| Table Configuration | clustering columns property removed from the properties of the given CatalogTable (Spark SQL) |
| Created Time | The current time |
getProvidedMetadata is used when:
CreateDeltaTableCommandis requested to handleCreateTable and replaceMetadataIfNecessary
Logging¶
Enable ALL logging level for org.apache.spark.sql.delta.commands.CreateDeltaTableCommand logger to see what happens inside.
Add the following line to conf/log4j2.properties:
logger.CreateDeltaTableCommand.name = org.apache.spark.sql.delta.commands.CreateDeltaTableCommand
logger.CreateDeltaTableCommand.level = all
Refer to Logging.