InsertIntoHadoopFsRelationCommand Logical Command¶
InsertIntoHadoopFsRelationCommand is a write command that is used to write the result of a query to an output path (in the given format).
Creating Instance¶
InsertIntoHadoopFsRelationCommand takes the following to be created:
- Output Path (as a Hadoop Path)
- Static Partitions
-
ifPartitionNotExistsFlag - Partition Columns (
Seq[Attribute]) - BucketSpec (optional)
- FileFormat
- Options (
Map[String, String]) - Query
- SaveMode
- CatalogTable (optional)
- FileIndex (optional)
- Names of the output columns
InsertIntoHadoopFsRelationCommand is created when:
DataSourceis requested to planForWritingFileFormat- DataSourceAnalysis logical resolution rule is executed (for a InsertIntoStatement over a LogicalRelation with a HadoopFsRelation)
- RelationConversions logical evaluation rule is executed
Static Partitions¶
type TablePartitionSpec = Map[String, String]
staticPartitions: TablePartitionSpec
InsertIntoHadoopFsRelationCommand is given a specification of a table partition (as a mapping of column names to column values) when created.
Partitions can only be given when created for DataSourceAnalysis posthoc logical resolution rule when executed for a InsertIntoStatement over a LogicalRelation with a HadoopFsRelation
There will be no partitions when created for the following:
OptimizedCreateHiveTableAsSelectCommandlogical commandDataSourcewhen requested to planForWritingFileFormat
Dynamic Partition Inserts and dynamicPartitionOverwrite Flag¶
dynamicPartitionOverwrite: Boolean
InsertIntoHadoopFsRelationCommand defines a dynamicPartitionOverwrite flag to indicate whether dynamic partition inserts is enabled or not.
dynamicPartitionOverwrite is based on the following (in the order of precedence):
- partitionOverwriteMode option (
STATICorDYNAMIC) in the parameters if available - spark.sql.sources.partitionOverwriteMode
dynamicPartitionOverwrite is used when:
- DataSourceAnalysis logical resolution rule is executed (for dynamic partition overwrite)
InsertIntoHadoopFsRelationCommandis executed
Executing Command¶
DataWritingCommand
run(
sparkSession: SparkSession,
child: SparkPlan): Seq[Row]
run is part of the DataWritingCommand abstraction.
run creates a new Hadoop Configuration with the options and resolves the outputPath.
run uses the following to determine whether partitions are tracked by a catalog (partitionsTrackedByCatalog):
- spark.sql.hive.manageFilesourcePartitions configuration property
- catalogTable is defined with the partitionColumnNames and tracksPartitionsInCatalog flag enabled
FIXME
When is the catalogTable defined?
FIXME
When is tracksPartitionsInCatalog enabled?
With partitions tracked by a catalog, run...FIXME
run uses FileCommitProtocol utility to instantiate a FileCommitProtocol based on the spark.sql.sources.commitProtocolClass with the following:
- Random job ID
- outputPath
- dynamicPartitionOverwrite
For insertion (doInsertion), run...FIXME
Otherwise (for a non-insertion case), run does nothing but prints out the following INFO message to the logs and finishes.
Skipping insertion into a relation that already exists.
run makes sure that there are no duplicates in the outputColumnNames.