Skip to content

ConvertToDeltaCommand (ConvertToDeltaCommandBase)

ConvertToDeltaCommand is a DeltaCommand that converts a parquet table to delta format.

ConvertToDeltaCommand represents the following high-level operators:

ConvertToDeltaCommand is a LeafRunnableCommand (Spark SQL).

ConvertToDeltaCommand requires that the partition schema matches the partitions of the parquet table (or an AnalysisException is thrown).

ConvertToDeltaCommand saves collectStats in a Convert operation to indicate whether collectStats and spark.databricks.delta.stats.collect flags were both enabled.

Creating Instance

ConvertToDeltaCommand takes the following to be created:

  • Table Identifier
  • Partition schema (optional)
  • collectStats flag
  • Path of the delta table (optional)

ConvertToDeltaCommand is created when:

collectStats

ConvertToDeltaCommand is given collectStats flag when created:

Executing Command

RunnableCommand
run(
  spark: SparkSession): Seq[Row]

run is part of the RunnableCommand (Spark SQL) contract.

run creates a ConvertProperties from the TableIdentifier (with the given SparkSession).

run makes sure that the (data source) provider (the database part of the TableIdentifier) is either delta or parquet. For all other data source providers, run throws an AnalysisException:

CONVERT TO DELTA only supports parquet tables, but you are trying to convert a [sourceName] source: [ident]

For delta data source provider, run simply prints out the following message to standard output and returns.

The table you are trying to convert is already a delta table

For parquet data source provider, run uses DeltaLog utility to create a DeltaLog. run then requests DeltaLog to update and start a new transaction. In the end, run performConvert.

In case the readVersion of the new transaction is greater than -1, run simply prints out the following message to standard output and returns.

The table you are trying to convert is already a delta table

performConvert

performConvert(
  spark: SparkSession,
  txn: OptimisticTransaction,
  convertProperties: ConvertTarget): Seq[Row]

performConvert makes sure that the directory exists (from the given ConvertProperties which is the table part of the TableIdentifier of the command).

performConvert requests the OptimisticTransaction for the DeltaLog that is then requested to ensureLogDirectoryExist.

performConvert creates a Dataset to recursively list directories and files in the directory and leaves only files (by filtering out directories using WHERE clause).

Note

performConvert uses Dataset API to build a distributed computation to query files.

performConvert caches the Dataset of file names.

performConvert uses spark.databricks.delta.import.batchSize.schemaInference configuration property for the number of files per batch for schema inference. performConvert mergeSchemasInParallel for every batch of files and then mergeSchemas.

performConvert constructTableSchema using the inferred table schema and the partitionSchema (if specified).

performConvert creates a new Metadata using the table schema and the partitionSchema (if specified).

performConvert requests the OptimisticTransaction to update the metadata.

performConvert uses spark.databricks.delta.import.batchSize.statsCollection configuration property for the number of files per batch for stats collection. performConvert creates an AddFile (in the data path of the DeltaLog of the OptimisticTransaction) for every file in a batch.

In the end, performConvert streamWrite (with the OptimisticTransaction, the AddFiles, and Convert operation) and unpersists the Dataset of file names.

checkColumnMapping

checkColumnMapping(
  txnMetadata: Metadata,
  convertTargetTable: ConvertTargetTable): Unit

checkColumnMapping throws a DeltaColumnMappingUnsupportedException when the requiredColumnMappingMode of the given ConvertTargetTable is not DeltaColumnMappingMode of the given Metadata.

streamWrite

streamWrite(
  spark: SparkSession,
  txn: OptimisticTransaction,
  addFiles: Iterator[AddFile],
  op: DeltaOperations.Operation,
  numFiles: Long): Long

streamWrite...FIXME

createAddFile

createAddFile(
  file: SerializableFileStatus,
  basePath: Path,
  fs: FileSystem,
  conf: SQLConf): AddFile

createAddFile creates an AddFile action.

Internally, createAddFile...FIXME

createAddFile throws an AnalysisException if the number of fields in the given partition schema does not match the number of partitions found (at partition discovery phase):

Expecting [size] partition column(s): [expectedCols], but found [size] partition column(s): [parsedCols] from parsing the file name: [path]

mergeSchemasInParallel

mergeSchemasInParallel(
  sparkSession: SparkSession,
  filesToTouch: Seq[FileStatus],
  serializedConf: SerializableConfiguration): Option[StructType]

mergeSchemasInParallel...FIXME

constructTableSchema

constructTableSchema(
  spark: SparkSession,
  dataSchema: StructType,
  partitionFields: Seq[StructField]): StructType

constructTableSchema...FIXME

createDeltaActions

createDeltaActions(
  spark: SparkSession,
  manifest: ConvertTargetFileManifest,
  partitionSchema: StructType,
  txn: OptimisticTransaction,
  fs: FileSystem): Iterator[AddFile]

createDeltaActions...FIXME

getOperation

getOperation(
  numFilesConverted: Long,
  convertProperties: ConvertTarget,
  sourceFormat: String): DeltaOperations.Operation

getOperation creates a Convert operation.

Property Value
numFiles number of files in the target table
partitionBy partitionSchema
collectStats collectStats AND spark.databricks.delta.stats.collect
catalogTable CatalogTable of the given ConvertTarget (if defined)
sourceFormat The given sourceFormat

ConvertToDeltaCommandBase

ConvertToDeltaCommandBase is the base of ConvertToDeltaCommand-like commands with the only known implementation being ConvertToDeltaCommand itself.

isCatalogTable

Signature
isCatalogTable(
  analyzer: Analyzer,
  tableIdent: TableIdentifier): Boolean

isCatalogTable is part of the DeltaCommand abstraction.

isCatalogTable...FIXME

getTargetTable

getTargetTable(
  spark: SparkSession,
  target: ConvertTarget): ConvertTargetTable

getTargetTable...FIXME


getTargetTable is used when:

spark.databricks.delta.stats.collect

statsEnabled: Boolean

statsEnabled is the value of spark.databricks.delta.stats.collect configuration property.

Lazy Value

statsEnabled is a Scala lazy value to guarantee that the code to initialize it is executed once only (when accessed for the first time) and the computed value never changes afterwards.

Learn more in the Scala Language Specification.


statsEnabled is used when: