ConvertToDeltaCommand (ConvertToDeltaCommandBase)¶
ConvertToDeltaCommand
is a DeltaCommand that converts a parquet table to delta format (imports it into Delta).
ConvertToDeltaCommand
is a RunnableCommand
(Spark SQL).
ConvertToDeltaCommand
requires that the partition schema matches the partitions of the parquet table (or an AnalysisException is thrown)
Creating Instance¶
ConvertToDeltaCommand
takes the following to be created:
- Parquet table (
TableIdentifier
) - Partition schema (
Option[StructType]
) - Delta Path (
Option[String]
)
ConvertToDeltaCommand
is created when:
- CONVERT TO DELTA statement is used (and
DeltaSqlAstBuilder
is requested to visitConvert) - DeltaTable.convertToDelta utility is used (and
DeltaConvert
utility is used to executeConvert)
Executing Command¶
run(
spark: SparkSession): Seq[Row]
run
is part of the RunnableCommand
(Spark SQL) contract.
run
creates a ConvertProperties from the TableIdentifier (with the given SparkSession
).
run
makes sure that the (data source) provider (the database part of the TableIdentifier) is either delta
or parquet
. For all other data source providers, run
throws an AnalysisException
:
CONVERT TO DELTA only supports parquet tables, but you are trying to convert a [sourceName] source: [ident]
For delta
data source provider, run
simply prints out the following message to standard output and returns.
The table you are trying to convert is already a delta table
For parquet
data source provider, run
uses DeltaLog
utility to create a DeltaLog. run
then requests DeltaLog
to update and start a new transaction. In the end, run
performConvert.
In case the readVersion of the new transaction is greater than -1
, run
simply prints out the following message to standard output and returns.
The table you are trying to convert is already a delta table
performConvert¶
performConvert(
spark: SparkSession,
txn: OptimisticTransaction,
convertProperties: ConvertTarget): Seq[Row]
performConvert
makes sure that the directory exists (from the given ConvertProperties
which is the table part of the TableIdentifier of the command).
performConvert
requests the OptimisticTransaction
for the DeltaLog that is then requested to ensureLogDirectoryExist.
performConvert
creates a Dataset to recursively list directories and files in the directory and leaves only files (by filtering out directories using WHERE
clause).
Note
performConvert
uses Dataset
API to build a distributed computation to query files.
performConvert
caches the Dataset
of file names.
performConvert
uses spark.databricks.delta.import.batchSize.schemaInference configuration property for the number of files per batch for schema inference. performConvert
mergeSchemasInParallel for every batch of files and then mergeSchemas.
performConvert
constructTableSchema using the inferred table schema and the partitionSchema (if specified).
performConvert
creates a new Metadata using the table schema and the partitionSchema (if specified).
performConvert
requests the OptimisticTransaction
to update the metadata.
performConvert
uses spark.databricks.delta.import.batchSize.statsCollection configuration property for the number of files per batch for stats collection. performConvert
creates an AddFile (in the data path of the DeltaLog of the OptimisticTransaction
) for every file in a batch.
In the end, performConvert
streamWrite (with the OptimisticTransaction
, the AddFile
s, and Convert operation) and unpersists the Dataset
of file names.
checkColumnMapping¶
checkColumnMapping(
txnMetadata: Metadata,
convertTargetTable: ConvertTargetTable): Unit
checkColumnMapping
throws a DeltaColumnMappingUnsupportedException when the requiredColumnMappingMode of the given ConvertTargetTable is not DeltaColumnMappingMode of the given Metadata.
streamWrite¶
streamWrite(
spark: SparkSession,
txn: OptimisticTransaction,
addFiles: Iterator[AddFile],
op: DeltaOperations.Operation,
numFiles: Long): Long
streamWrite
...FIXME
createAddFile¶
createAddFile(
file: SerializableFileStatus,
basePath: Path,
fs: FileSystem,
conf: SQLConf): AddFile
createAddFile
creates an AddFile action.
Internally, createAddFile
...FIXME
createAddFile
throws an AnalysisException
if the number of fields in the given partition schema does not match the number of partitions found (at partition discovery phase):
Expecting [size] partition column(s): [expectedCols], but found [size] partition column(s): [parsedCols] from parsing the file name: [path]
mergeSchemasInParallel¶
mergeSchemasInParallel(
sparkSession: SparkSession,
filesToTouch: Seq[FileStatus],
serializedConf: SerializableConfiguration): Option[StructType]
mergeSchemasInParallel
...FIXME
constructTableSchema¶
constructTableSchema(
spark: SparkSession,
dataSchema: StructType,
partitionFields: Seq[StructField]): StructType
constructTableSchema
...FIXME
ConvertToDeltaCommandBase¶
ConvertToDeltaCommandBase
is the base of ConvertToDeltaCommand
-like commands with the only known implementation being ConvertToDeltaCommand
itself.
isCatalogTable¶
isCatalogTable(
analyzer: Analyzer,
tableIdent: TableIdentifier): Boolean
isCatalogTable
...FIXME
isCatalogTable
is part of the DeltaCommand abstraction.
getTargetTable¶
getTargetTable(
spark: SparkSession,
target: ConvertTarget): ConvertTargetTable
getTargetTable
...FIXME
getTargetTable
is used when:
ConvertToDeltaCommandBase
is executed