InsertIntoHadoopFsRelationCommand Logical Command¶
InsertIntoHadoopFsRelationCommand
is a write command that is used to write the result of a query to an output path (in the given format).
Creating Instance¶
InsertIntoHadoopFsRelationCommand
takes the following to be created:
- Output Path (as a Hadoop Path)
- Static Partitions
-
ifPartitionNotExists
Flag - Partition Columns (
Seq[Attribute]
) - BucketSpec (optional)
- FileFormat
- Options (
Map[String, String]
) - Query
- SaveMode
- CatalogTable (optional)
- FileIndex (optional)
- Names of the output columns
InsertIntoHadoopFsRelationCommand
is created when:
DataSource
is requested to planForWritingFileFormat- DataSourceAnalysis logical resolution rule is executed (for a InsertIntoStatement over a LogicalRelation with a HadoopFsRelation)
- RelationConversions logical evaluation rule is executed
Static Partitions¶
type TablePartitionSpec = Map[String, String]
staticPartitions: TablePartitionSpec
InsertIntoHadoopFsRelationCommand
is given a specification of a table partition (as a mapping of column names to column values) when created.
Partitions can only be given when created for DataSourceAnalysis posthoc logical resolution rule when executed for a InsertIntoStatement
over a LogicalRelation with a HadoopFsRelation
There will be no partitions when created for the following:
OptimizedCreateHiveTableAsSelectCommand
logical commandDataSource
when requested to planForWritingFileFormat
Dynamic Partition Inserts and dynamicPartitionOverwrite Flag¶
dynamicPartitionOverwrite: Boolean
InsertIntoHadoopFsRelationCommand
defines a dynamicPartitionOverwrite
flag to indicate whether dynamic partition inserts is enabled or not.
dynamicPartitionOverwrite
is based on the following (in the order of precedence):
- partitionOverwriteMode option (
STATIC
orDYNAMIC
) in the parameters if available - spark.sql.sources.partitionOverwriteMode
dynamicPartitionOverwrite
is used when:
- DataSourceAnalysis logical resolution rule is executed (for dynamic partition overwrite)
InsertIntoHadoopFsRelationCommand
is executed
Executing Command¶
DataWritingCommand
run(
sparkSession: SparkSession,
child: SparkPlan): Seq[Row]
run
is part of the DataWritingCommand abstraction.
run
creates a new Hadoop Configuration
with the options and resolves the outputPath.
run
uses the following to determine whether partitions are tracked by a catalog (partitionsTrackedByCatalog
):
- spark.sql.hive.manageFilesourcePartitions configuration property
- catalogTable is defined with the partitionColumnNames and tracksPartitionsInCatalog flag enabled
FIXME
When is the catalogTable defined?
FIXME
When is tracksPartitionsInCatalog enabled?
With partitions tracked by a catalog, run
...FIXME
run
uses FileCommitProtocol
utility to instantiate a FileCommitProtocol
based on the spark.sql.sources.commitProtocolClass with the following:
- Random job ID
- outputPath
- dynamicPartitionOverwrite
For insertion (doInsertion
), run
...FIXME
Otherwise (for a non-insertion case), run
does nothing but prints out the following INFO message to the logs and finishes.
Skipping insertion into a relation that already exists.
run
makes sure that there are no duplicates in the outputColumnNames.