Skip to content

CreateDeltaTableCommand

CreateDeltaTableCommand is a LeafRunnableCommand (Spark SQL) to create a delta table (for DeltaCatalog).

CreateDeltaTableCommand is a DeltaCommand.

Creating Instance

CreateDeltaTableCommand takes the following to be created:

  • CatalogTable (Spark SQL)
  • Existing CatalogTable (if available)
  • SaveMode
  • Optional Data Query (LogicalPlan)
  • CreationMode
  • tableByPath flag (default: false)
  • Output attributes
  • Optional Protocol

CreateDeltaTableCommand is created when:

CreationMode

CreateDeltaTableCommand can be given a CreationMode when created:

  • Create (default)
  • CreateOrReplace
  • Replace

CreationMode is Create by default or specified by DeltaCatalog.

Executing Command

RunnableCommand
run(
  sparkSession: SparkSession): Seq[Row]

run is part of the RunnableCommand (Spark SQL) abstraction.

run...FIXME

updateCatalog

updateCatalog(
  spark: SparkSession,
  table: CatalogTable): Unit

updateCatalog uses the given SparkSession to access SessionCatalog to createTable or alterTable when the tableByPath flag is off. Otherwise, updateCatalog does nothing.

getOperation

getOperation(
  metadata: Metadata,
  isManagedTable: Boolean,
  options: Option[DeltaOptions]): DeltaOperations.Operation

getOperation...FIXME

replaceMetadataIfNecessary

replaceMetadataIfNecessary(
  txn: OptimisticTransaction,
  tableDesc: CatalogTable,
  options: DeltaOptions,
  schema: StructType): Unit
tableDesc Unused

tableDesc argument is not used.

replaceMetadataIfNecessary determines whether or not it is a replace operation (i.e., CreateOrReplace or Replace based on the CreationMode).

replaceMetadataIfNecessary determines whether or not it is supposed not to overwrite the schema of a Delta table (based on the overwriteSchema option in the input DeltaOptions).

In the end, only for an CreateOrReplace or Replace operation on an existing delta table with overwriteSchema option enabled, replaceMetadataIfNecessary updates the metadata (on the given OptimisticTransaction) with the given schema.

DeltaIllegalArgumentException

replaceMetadataIfNecessary throws an DeltaIllegalArgumentException for a CreateOrReplace or Replace operation with overwriteSchema option enabled:

The usage of overwriteSchema is not allowed when replacing a Delta table.

Handling Transaction Commit

handleCommit(
  sparkSession: SparkSession,
  deltaLog: DeltaLog,
  tableWithLocation: CatalogTable): Seq[Row]

handleCommit starts a transaction.

handleCommit executes the logic to handle the query (that gives the result to be returned).

In the end, handleCommit runPostCommitUpdates.

CREATE TABLE AS SELECT

handleCreateTableAsSelect(
  sparkSession: SparkSession,
  txn: OptimisticTransaction,
  deltaLog: DeltaLog,
  deltaWriter: WriteIntoDeltaLike,
  tableWithLocation: CatalogTable): Unit
Procedure

handleCreateTable is a procedure (returns Unit) so what happens inside stays inside (paraphrasing the former advertising slogan of Las Vegas, Nevada).

handleCreateTableAsSelect...FIXME

CREATE TABLE

handleCreateTable(
  sparkSession: SparkSession,
  txn: OptimisticTransaction,
  tableWithLocation: CatalogTable,
  fs: FileSystem,
  hadoopConf: Configuration): Unit
Procedure

handleCreateTable is a procedure (returns Unit) so what happens inside stays inside (paraphrasing the former advertising slogan of Las Vegas, Nevada).

handleCreateTable...FIXME

Executing Post-Commit Updates

runPostCommitUpdates(
  sparkSession: SparkSession,
  txnUsedForCommit: OptimisticTransaction,
  deltaLog: DeltaLog,
  tableWithLocation: CatalogTable): Unit
Procedure

runPostCommitUpdates is a procedure (returns Unit) so what happens inside stays inside (paraphrasing the former advertising slogan of Las Vegas, Nevada).

runPostCommitUpdates prints out the following INFO message to the logs:

Table is path-based table: [tableByPath]. Update catalog with mode: [operation]

runPostCommitUpdates requests the given DeltaLog to update.

runPostCommitUpdates updates the catalog.

In the end, when delta.universalFormat.enabledFormats table property contains iceberg, runPostCommitUpdates requests the UniversalFormatConverter to convertSnapshot.

Updating Table Catalog

updateCatalog(
  spark: SparkSession,
  table: CatalogTable,
  snapshot: Snapshot,
  didNotChangeMetadata: Boolean): Unit
Procedure

updateCatalog is a procedure (returns Unit) so what happens inside stays inside (paraphrasing the former advertising slogan of Las Vegas, Nevada).

didNotChangeMetadata Not Used

updateCatalog prints out the following INFO message to the logs:

Table is path-based table: [tableByPath]. Update catalog with mode: [operation]

updateCatalog requests the given DeltaLog to update.

updateCatalog updates the catalog.

In the end, when delta.universalFormat.enabledFormats table property contains iceberg, updateCatalog requests the UniversalFormatConverter to convertSnapshot.

Provided Metadata

getProvidedMetadata(
  table: CatalogTable,
  schemaString: String): Metadata

getProvidedMetadata gives a new Metadata with the values copied directly from the given CatalogTable (Spark SQL).

This Metadata has got clustering columns property removed from the table properties and uses the given schema (schemaString).


getProvidedMetadata creates a Metadata with the following:

Metadata Value
Description The comment of the given CatalogTable (Spark SQL), if defined
Schema The given schemaString
Partition Columns The partitionColumnNames of the given CatalogTable (Spark SQL)
Table Configuration clustering columns property removed from the properties of the given CatalogTable (Spark SQL)
Created Time The current time

getProvidedMetadata is used when:

Logging

Enable ALL logging level for org.apache.spark.sql.delta.commands.CreateDeltaTableCommand logger to see what happens inside.

Add the following line to conf/log4j2.properties:

logger.CreateDeltaTableCommand.name = org.apache.spark.sql.delta.commands.CreateDeltaTableCommand
logger.CreateDeltaTableCommand.level = all

Refer to Logging.