InsertIntoDataSourceCommand Logical Command¶
InsertIntoDataSourceCommand
is a RunnableCommand that <
InsertIntoDataSourceCommand
is <
sql("DROP TABLE IF EXISTS t2")
sql("CREATE TABLE t2(id long)")
val query = "SELECT * FROM RANGE(1)"
// Using INSERT INTO SQL statement so we can access QueryExecution
// DataFrameWriter.insertInto returns no value
val q = sql("INSERT INTO TABLE t2 " + query)
val logicalPlan = q.queryExecution.logical
scala> println(logicalPlan.numberedTreeString)
00 'InsertIntoTable 'UnresolvedRelation `t2`, false, false
01 +- 'Project [*]
02 +- 'UnresolvedTableValuedFunction RANGE, [1]
val analyzedPlan = q.queryExecution.analyzed
scala> println(analyzedPlan.numberedTreeString)
00 InsertIntoHiveTable `default`.`t2`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, false, false, [id#6L]
01 +- Project [id#6L]
02 +- Range (0, 1, step=1, splits=None)
[[innerChildren]] InsertIntoDataSourceCommand
returns the <
val query = "SELECT * FROM RANGE(1)"
val sqlText = "INSERT INTO TABLE t2 " + query
val plan = spark.sessionState.sqlParser.parsePlan(sqlText)
scala> println(plan.numberedTreeString)
00 'InsertIntoTable 'UnresolvedRelation `t2`, false, false
01 +- 'Project [*]
02 +- 'UnresolvedTableValuedFunction RANGE, [1]
Creating Instance¶
InsertIntoDataSourceCommand
takes the following to be created:
- [[logicalRelation]] <
> leaf logical operator - [[query]] <
> - [[overwrite]]
overwrite
flag
=== [[run]] Executing Logical Command (Inserting or Overwriting Data in InsertableRelation) -- run
Method
[source, scala]¶
run( session: SparkSession): Seq[Row]
run
is part of the RunnableCommand abstraction.
run
takes the InsertableRelation (that is the <
run
then <SparkSession
.
run
requests the DataFrame
for the <run
requests the <
With the RDD and the output schema, run
creates <RDD[InternalRow]
with the schema applied.
run
requests the InsertableRelation
to insert or overwrite data.
In the end, since the data in the InsertableRelation
has changed, run
requests the CacheManager
to recacheByPlan with the <
NOTE: run
requests the SparkSession
for <