Skip to content

CreateDeltaTableCommand

CreateDeltaTableCommand is a LeafRunnableCommand (Spark SQL) to create a delta table (for DeltaCatalog).

CreateDeltaTableCommand is a DeltaCommand.

Creating Instance

CreateDeltaTableCommand takes the following to be created:

CreateDeltaTableCommand is created when:

Optional Logical Query Plan

query: Option[LogicalPlan]

CreateDeltaTableCommand can be given a LogicalPlan (Spark SQL) when created.

The LogicalPlan can be one of the following (that triggers a custom handling):

Logical Query Plan Handler
CloneTableCommand handleClone
WriteIntoDeltaLike handleCreateTableAsSelect
Some other LogicalPlan handleCreateTableAsSelect
Undefined handleCreateTable

CreationMode

CreateDeltaTableCommand can be given a CreationMode when created:

  • Create (default)
  • CreateOrReplace
  • Replace

CreationMode is Create by default or specified by DeltaCatalog.

Executing Command

RunnableCommand
run(
  sparkSession: SparkSession): Seq[Row]

run is part of the RunnableCommand (Spark SQL) abstraction.

run...FIXME

updateCatalog

updateCatalog(
  spark: SparkSession,
  table: CatalogTable): Unit

updateCatalog uses the given SparkSession to access SessionCatalog to createTable or alterTable when the tableByPath flag is off. Otherwise, updateCatalog does nothing.

getOperation

getOperation(
  metadata: Metadata,
  isManagedTable: Boolean,
  options: Option[DeltaOptions]): DeltaOperations.Operation

getOperation...FIXME

replaceMetadataIfNecessary

replaceMetadataIfNecessary(
  txn: OptimisticTransaction,
  tableDesc: CatalogTable,
  options: DeltaOptions,
  schema: StructType): Unit
tableDesc Unused

tableDesc argument is not used.

replaceMetadataIfNecessary determines whether or not it is a replace operation (i.e., CreateOrReplace or Replace based on the CreationMode).

replaceMetadataIfNecessary determines whether or not it is supposed not to overwrite the schema of a Delta table (based on the overwriteSchema option in the input DeltaOptions).

In the end, only for an CreateOrReplace or Replace operation on an existing delta table with overwriteSchema option enabled, replaceMetadataIfNecessary updates the metadata (on the given OptimisticTransaction) with the given schema.

DeltaIllegalArgumentException

replaceMetadataIfNecessary throws an DeltaIllegalArgumentException for a CreateOrReplace or Replace operation with overwriteSchema option enabled:

The usage of overwriteSchema is not allowed when replacing a Delta table.

Handling Transaction Commit

handleCommit(
  sparkSession: SparkSession,
  deltaLog: DeltaLog,
  tableWithLocation: CatalogTable): Seq[Row]

handleCommit starts a transaction.

handleCommit executes one of the following logic to handle the query (that gives the result to be returned):

In the end, handleCommit runs post-commit updates.

CloneTableCommand

handleCommit checkPathEmpty.

handleCommit requests the CloneTableCommand to handleClone.

WriteIntoDeltaLike

handleCommit checkPathEmpty.

handleCommit handleCreateTableAsSelect.

Some Other Query

handleCommit checkPathEmpty.

handleCommit makes sure that the query is not a RunnableCommand (Spark SQL) or throws an IllegalArgumentException.

handleCommit handleCreateTableAsSelect with a new WriteIntoDelta.

No Query

With no query specified, handleCommit handleCreateTable

Executing Post-Commit Updates

runPostCommitUpdates(
  sparkSession: SparkSession,
  txnUsedForCommit: OptimisticTransaction,
  deltaLog: DeltaLog,
  tableWithLocation: CatalogTable): Unit
Procedure

runPostCommitUpdates is a procedure (returns Unit) so what happens inside stays inside (paraphrasing the former advertising slogan of Las Vegas, Nevada).

runPostCommitUpdates prints out the following INFO message to the logs:

Table is path-based table: [tableByPath]. Update catalog with mode: [operation]

runPostCommitUpdates requests the given DeltaLog to update.

runPostCommitUpdates updates the catalog.

In the end, when delta.universalFormat.enabledFormats table property contains iceberg, runPostCommitUpdates requests the UniversalFormatConverter to convertSnapshot.

Updating Table Catalog

updateCatalog(
  spark: SparkSession,
  table: CatalogTable,
  snapshot: Snapshot,
  didNotChangeMetadata: Boolean): Unit
Procedure

updateCatalog is a procedure (returns Unit) so what happens inside stays inside (paraphrasing the former advertising slogan of Las Vegas, Nevada).

didNotChangeMetadata Not Used

updateCatalog prints out the following INFO message to the logs:

Table is path-based table: [tableByPath]. Update catalog with mode: [operation]

updateCatalog requests the given DeltaLog to update.

updateCatalog updates the catalog.

In the end, when delta.universalFormat.enabledFormats table property contains iceberg, updateCatalog requests the UniversalFormatConverter to convertSnapshot.

handleCreateTable

handleCreateTable(
  sparkSession: SparkSession,
  txn: OptimisticTransaction,
  tableWithLocation: CatalogTable,
  fs: FileSystem,
  hadoopConf: Configuration): Unit

handleCreateTable...FIXME

handleCreateTableAsSelect

handleCreateTableAsSelect(
  sparkSession: SparkSession,
  txn: OptimisticTransaction,
  deltaLog: DeltaLog,
  deltaWriter: WriteIntoDeltaLike,
  tableWithLocation: CatalogTable): Unit

handleCreateTableAsSelect...FIXME

Provided Metadata

getProvidedMetadata(
  table: CatalogTable,
  schemaString: String): Metadata

getProvidedMetadata gives a new Metadata with the values copied directly from the given CatalogTable (Spark SQL).

This Metadata has got clustering columns property removed from the table properties and uses the given schema (schemaString).


getProvidedMetadata creates a Metadata with the following:

Metadata Value
Description The comment of the given CatalogTable (Spark SQL), if defined
Schema The given schemaString
Partition Columns The partitionColumnNames of the given CatalogTable (Spark SQL)
Table Configuration clustering columns property removed from the properties of the given CatalogTable (Spark SQL)
Created Time The current time

getProvidedMetadata is used when:

Logging

Enable ALL logging level for org.apache.spark.sql.delta.commands.CreateDeltaTableCommand logger to see what happens inside.

Add the following line to conf/log4j2.properties:

logger.CreateDeltaTableCommand.name = org.apache.spark.sql.delta.commands.CreateDeltaTableCommand
logger.CreateDeltaTableCommand.level = all

Refer to Logging.