Skip to content

ConvertToDeltaCommand

ConvertToDeltaCommand is a DeltaCommand that converts a parquet table into delta format (imports it into Delta).

ConvertToDeltaCommand is a Spark SQL RunnableCommand (and executed eagerly on the driver for side-effects).

ConvertToDeltaCommand requires that the partition schema matches the partitions of the parquet table (or an AnalysisException is thrown)

ConvertToDeltaCommandBase is the base of ConvertToDeltaCommand-like commands with the only known implementation being ConvertToDeltaCommand itself.

Creating Instance

ConvertToDeltaCommand takes the following to be created:

  • Parquet table (TableIdentifier)
  • Partition schema (Option[StructType])
  • Delta Path (Option[String])

ConvertToDeltaCommand is created when:

Executing Command

run(
  spark: SparkSession): Seq[Row]

run is part of the RunnableCommand contract.

run <> from the <> (with the given SparkSession).

run makes sure that the (data source) provider (the database part of the <>) is either delta or parquet. For all other data source providers, run throws an AnalysisException:

CONVERT TO DELTA only supports parquet tables, but you are trying to convert a [sourceName] source: [ident]

For delta data source provider, run simply prints out the following message to standard output and returns.

The table you are trying to convert is already a delta table

For parquet data source provider, run uses DeltaLog utility to <>. run then requests DeltaLog to <> and <>. In the end, run <>.

In case the <> of the new transaction is greater than -1, run simply prints out the following message to standard output and returns.

The table you are trying to convert is already a delta table

Internal Helper Methods

performConvert Method

performConvert(
  spark: SparkSession,
  txn: OptimisticTransaction,
  convertProperties: ConvertTarget): Seq[Row]

performConvert makes sure that the directory exists (from the given ConvertProperties which is the table part of the <> of the command).

performConvert requests the OptimisticTransaction for the <> that is then requested to <>.

performConvert <> in the directory and leaves only files (by filtering out directories using WHERE clause).

NOTE: performConvert uses Dataset API to build a distributed computation to query files.

[[performConvert-cache]] performConvert caches the Dataset of file names.

[[performConvert-schemaBatchSize]] performConvert uses <> configuration property for the number of files per batch for schema inference. performConvert <> for every batch of files and then <>.

performConvert <> using the inferred table schema and the <> (if specified).

performConvert creates a new <> using the table schema and the <> (if specified).

performConvert requests the OptimisticTransaction to <>.

[[performConvert-statsBatchSize]] performConvert uses <> configuration property for the number of files per batch for stats collection. performConvert <> (in the <> of the <> of the OptimisticTransaction) for every file in a batch.

[[performConvert-streamWrite]][[performConvert-unpersist]] In the end, performConvert <> (with the OptimisticTransaction, the AddFiles, and Operation.md#Convert[Convert] operation) and unpersists the Dataset of file names.

streamWrite Method

streamWrite(
  spark: SparkSession,
  txn: OptimisticTransaction,
  addFiles: Iterator[AddFile],
  op: DeltaOperations.Operation,
  numFiles: Long): Long

streamWrite...FIXME

createAddFile Method

createAddFile(
  file: SerializableFileStatus,
  basePath: Path,
  fs: FileSystem,
  conf: SQLConf): AddFile

createAddFile creates an AddFile action.

Internally, createAddFile...FIXME

createAddFile throws an AnalysisException if the number of fields in the given <> does not match the number of partitions found (at partition discovery phase):

Expecting [size] partition column(s): [expectedCols], but found [size] partition column(s): [parsedCols] from parsing the file name: [path]

mergeSchemasInParallel Method

mergeSchemasInParallel(
  sparkSession: SparkSession,
  filesToTouch: Seq[FileStatus],
  serializedConf: SerializableConfiguration): Option[StructType]

mergeSchemasInParallel...FIXME

constructTableSchema Method

constructTableSchema(
  spark: SparkSession,
  dataSchema: StructType,
  partitionFields: Seq[StructField]): StructType

constructTableSchema...FIXME


Last update: 2020-09-24