CreateDeltaTableCommand¶
CreateDeltaTableCommand
is a LeafRunnableCommand
(Spark SQL) to create a delta table (for DeltaCatalog).
CreateDeltaTableCommand
is a DeltaCommand.
Creating Instance¶
CreateDeltaTableCommand
takes the following to be created:
-
CatalogTable
(Spark SQL) - Existing
CatalogTable
(if available) -
SaveMode
- Logical Query Plan
- CreationMode
-
tableByPath
flag (default:false
) - Output attributes
- Optional Protocol
CreateDeltaTableCommand
is created when:
- DeltaAnalysis logical resolution rule is executed for the following:
CreateTableLikeCommand
(with the delta table as the source or the provider beingdelta
)- CloneTableStatement
DeltaCatalog
is requested to create a delta table
Optional Logical Query Plan¶
query: Option[LogicalPlan]
CreateDeltaTableCommand
can be given a LogicalPlan
(Spark SQL) when created.
The LogicalPlan
can be one of the following (that triggers a custom handling):
Logical Query Plan | Handler |
---|---|
CloneTableCommand | handleClone |
WriteIntoDeltaLike | handleCreateTableAsSelect |
Some other LogicalPlan | handleCreateTableAsSelect |
Undefined | handleCreateTable |
CreationMode¶
CreateDeltaTableCommand
can be given a CreationMode
when created:
Create
(default)CreateOrReplace
Replace
CreationMode
is Create
by default or specified by DeltaCatalog.
Executing Command¶
RunnableCommand
run(
sparkSession: SparkSession): Seq[Row]
run
is part of the RunnableCommand
(Spark SQL) abstraction.
run
...FIXME
updateCatalog¶
updateCatalog(
spark: SparkSession,
table: CatalogTable): Unit
updateCatalog
uses the given SparkSession
to access SessionCatalog
to createTable
or alterTable
when the tableByPath flag is off. Otherwise, updateCatalog
does nothing.
getOperation¶
getOperation(
metadata: Metadata,
isManagedTable: Boolean,
options: Option[DeltaOptions]): DeltaOperations.Operation
getOperation
...FIXME
replaceMetadataIfNecessary¶
replaceMetadataIfNecessary(
txn: OptimisticTransaction,
tableDesc: CatalogTable,
options: DeltaOptions,
schema: StructType): Unit
tableDesc
Unused
tableDesc
argument is not used.
replaceMetadataIfNecessary
determines whether or not it is a replace operation (i.e., CreateOrReplace
or Replace
based on the CreationMode).
replaceMetadataIfNecessary
determines whether or not it is supposed not to overwrite the schema of a Delta table (based on the overwriteSchema option in the input DeltaOptions).
In the end, only for an CreateOrReplace
or Replace
operation on an existing delta table with overwriteSchema option enabled, replaceMetadataIfNecessary
updates the metadata (on the given OptimisticTransaction) with the given schema
.
DeltaIllegalArgumentException¶
replaceMetadataIfNecessary
throws an DeltaIllegalArgumentException
for a CreateOrReplace
or Replace
operation with overwriteSchema
option enabled:
The usage of overwriteSchema is not allowed when replacing a Delta table.
Handling Transaction Commit¶
handleCommit(
sparkSession: SparkSession,
deltaLog: DeltaLog,
tableWithLocation: CatalogTable): Seq[Row]
handleCommit
starts a transaction.
handleCommit
executes one of the following logic to handle the query (that gives the result to be returned):
- CloneTableCommand
- WriteIntoDeltaLike
- Some other query (that is neither aCloneTableCommand nor a WriteIntoDeltaLike)
- No query
In the end, handleCommit
runs post-commit updates.
CloneTableCommand¶
handleCommit
checkPathEmpty.
handleCommit
requests the CloneTableCommand to handleClone.
WriteIntoDeltaLike¶
handleCommit
checkPathEmpty.
handleCommit
handleCreateTableAsSelect.
Some Other Query¶
handleCommit
checkPathEmpty.
handleCommit
makes sure that the query is not a RunnableCommand
(Spark SQL) or throws an IllegalArgumentException
.
handleCommit
handleCreateTableAsSelect with a new WriteIntoDelta.
No Query¶
With no query specified, handleCommit
handleCreateTable
Executing Post-Commit Updates¶
runPostCommitUpdates(
sparkSession: SparkSession,
txnUsedForCommit: OptimisticTransaction,
deltaLog: DeltaLog,
tableWithLocation: CatalogTable): Unit
Procedure
runPostCommitUpdates
is a procedure (returns Unit
) so what happens inside stays inside (paraphrasing the former advertising slogan of Las Vegas, Nevada).
runPostCommitUpdates
prints out the following INFO message to the logs:
Table is path-based table: [tableByPath]. Update catalog with mode: [operation]
runPostCommitUpdates
requests the given DeltaLog to update.
runPostCommitUpdates
updates the catalog.
In the end, when delta.universalFormat.enabledFormats table property contains iceberg
, runPostCommitUpdates
requests the UniversalFormatConverter
to convertSnapshot.
Updating Table Catalog¶
updateCatalog(
spark: SparkSession,
table: CatalogTable,
snapshot: Snapshot,
didNotChangeMetadata: Boolean): Unit
Procedure
updateCatalog
is a procedure (returns Unit
) so what happens inside stays inside (paraphrasing the former advertising slogan of Las Vegas, Nevada).
didNotChangeMetadata
Not Used
updateCatalog
prints out the following INFO message to the logs:
Table is path-based table: [tableByPath]. Update catalog with mode: [operation]
updateCatalog
requests the given DeltaLog to update.
updateCatalog
updates the catalog.
In the end, when delta.universalFormat.enabledFormats table property contains iceberg
, updateCatalog
requests the UniversalFormatConverter
to convertSnapshot.
handleCreateTable¶
handleCreateTable(
sparkSession: SparkSession,
txn: OptimisticTransaction,
tableWithLocation: CatalogTable,
fs: FileSystem,
hadoopConf: Configuration): Unit
handleCreateTable
...FIXME
handleCreateTableAsSelect¶
handleCreateTableAsSelect(
sparkSession: SparkSession,
txn: OptimisticTransaction,
deltaLog: DeltaLog,
deltaWriter: WriteIntoDeltaLike,
tableWithLocation: CatalogTable): Unit
handleCreateTableAsSelect
...FIXME
Provided Metadata¶
getProvidedMetadata(
table: CatalogTable,
schemaString: String): Metadata
getProvidedMetadata
gives a new Metadata with the values copied directly from the given CatalogTable
(Spark SQL).
This Metadata
has got clustering columns property removed from the table properties and uses the given schema (schemaString
).
getProvidedMetadata
creates a Metadata with the following:
Metadata | Value |
---|---|
Description | The comment of the given CatalogTable (Spark SQL), if defined |
Schema | The given schemaString |
Partition Columns | The partitionColumnNames of the given CatalogTable (Spark SQL) |
Table Configuration | clustering columns property removed from the properties of the given CatalogTable (Spark SQL) |
Created Time | The current time |
getProvidedMetadata
is used when:
CreateDeltaTableCommand
is requested to handleCreateTable and replaceMetadataIfNecessary
Logging¶
Enable ALL
logging level for org.apache.spark.sql.delta.commands.CreateDeltaTableCommand
logger to see what happens inside.
Add the following line to conf/log4j2.properties
:
logger.CreateDeltaTableCommand.name = org.apache.spark.sql.delta.commands.CreateDeltaTableCommand
logger.CreateDeltaTableCommand.level = all
Refer to Logging.