ConvertToDeltaCommand¶
ConvertToDeltaCommand is a DeltaCommand that converts a parquet table into delta format (imports it into Delta).
ConvertToDeltaCommand
is a RunnableCommand
(Spark SQL) and executed eagerly on the driver for side-effects.
ConvertToDeltaCommand
requires that the partition schema matches the partitions of the parquet table (or an AnalysisException is thrown)
Creating Instance¶
ConvertToDeltaCommand
takes the following to be created:
- Parquet table (
TableIdentifier
) - Partition schema (
Option[StructType]
) - Delta Path (
Option[String]
)
ConvertToDeltaCommand
is created when:
- CONVERT TO DELTA statement is used (and
DeltaSqlAstBuilder
is requested to visitConvert) - DeltaTable.convertToDelta utility is used (and
DeltaConvert
utility is used to executeConvert)
Executing Command¶
run(
spark: SparkSession): Seq[Row]
run
is part of the RunnableCommand
contract.
run
<SparkSession
).
run
makes sure that the (data source) provider (the database part of the <delta
or parquet
. For all other data source providers, run
throws an AnalysisException
:
CONVERT TO DELTA only supports parquet tables, but you are trying to convert a [sourceName] source: [ident]
For delta
data source provider, run
simply prints out the following message to standard output and returns.
The table you are trying to convert is already a delta table
For parquet
data source provider, run
uses DeltaLog
utility to <run
then requests DeltaLog
to <run
<
In case the <-1
, run
simply prints out the following message to standard output and returns.
The table you are trying to convert is already a delta table
Internal Helper Methods¶
performConvert¶
performConvert(
spark: SparkSession,
txn: OptimisticTransaction,
convertProperties: ConvertTarget): Seq[Row]
performConvert
makes sure that the directory exists (from the given ConvertProperties
which is the table part of the <
performConvert
requests the OptimisticTransaction
for the <
performConvert
<WHERE
clause).
NOTE: performConvert
uses Dataset
API to build a distributed computation to query files.
[[performConvert-cache]] performConvert
caches the Dataset
of file names.
[[performConvert-schemaBatchSize]] performConvert
uses <performConvert
<
performConvert
<
performConvert
creates a new <
performConvert
requests the OptimisticTransaction
to <
[[performConvert-statsBatchSize]] performConvert
uses <performConvert
<OptimisticTransaction
) for every file in a batch.
[[performConvert-streamWrite]][[performConvert-unpersist]] In the end, performConvert
<OptimisticTransaction
, the AddFile
s, and Operation.md#Convert[Convert] operation) and unpersists the Dataset
of file names.
streamWrite¶
streamWrite(
spark: SparkSession,
txn: OptimisticTransaction,
addFiles: Iterator[AddFile],
op: DeltaOperations.Operation,
numFiles: Long): Long
streamWrite
...FIXME
createAddFile¶
createAddFile(
file: SerializableFileStatus,
basePath: Path,
fs: FileSystem,
conf: SQLConf): AddFile
createAddFile
creates an AddFile action.
Internally, createAddFile
...FIXME
createAddFile
throws an AnalysisException
if the number of fields in the given <
Expecting [size] partition column(s): [expectedCols], but found [size] partition column(s): [parsedCols] from parsing the file name: [path]
mergeSchemasInParallel¶
mergeSchemasInParallel(
sparkSession: SparkSession,
filesToTouch: Seq[FileStatus],
serializedConf: SerializableConfiguration): Option[StructType]
mergeSchemasInParallel
...FIXME
constructTableSchema¶
constructTableSchema(
spark: SparkSession,
dataSchema: StructType,
partitionFields: Seq[StructField]): StructType
constructTableSchema
...FIXME
ConvertToDeltaCommandBase¶
ConvertToDeltaCommandBase
is the base of ConvertToDeltaCommand
-like commands with the only known implementation being ConvertToDeltaCommand
itself.