ColumnPruning Logical Optimization¶
ColumnPruning
is a base logical optimization that <
ColumnPruning
is part of the RewriteSubquery once-executed batch in the standard batches of the Logical Optimizer.
ColumnPruning
is simply a <Rule[LogicalPlan]
.
=== [[example1]] Example 1
[source, scala]¶
val dataset = spark.range(10).withColumn("bucket", 'id % 3)
import org.apache.spark.sql.expressions.Window val rankCol = rank over Window.partitionBy('bucket).orderBy('id) as "rank"
val ranked = dataset.withColumn("rank", rankCol)
scala> ranked.explain(true) ... TRACE SparkOptimizer: === Applying Rule org.apache.spark.sql.catalyst.optimizer.ColumnPruning === Project [id#73L, bucket#76L, rank#192] Project [id#73L, bucket#76L, rank#192] !+- Project [id#73L, bucket#76L, rank#82, rank#82 AS rank#192] +- Project [id#73L, bucket#76L, rank#82 AS rank#192] +- Window [rank(id#73L) windowspecdefinition(bucket#76L, id#73L ASC, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rank#82], [bucket#76L], [id#73L ASC] +- Window [rank(id#73L) windowspecdefinition(bucket#76L, id#73L ASC, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rank#82], [bucket#76L], [id#73L ASC] ! +- Project [id#73L, bucket#76L] +- Project [id#73L, (id#73L % cast(3 as bigint)) AS bucket#76L] ! +- Project [id#73L, (id#73L % cast(3 as bigint)) AS bucket#76L] +- Range (0, 10, step=1, splits=Some(8)) ! +- Range (0, 10, step=1, splits=Some(8)) ... TRACE SparkOptimizer: Fixed point reached for batch Operator Optimizations after 2 iterations. DEBUG SparkOptimizer: === Result of Batch Operator Optimizations === !Project [id#73L, bucket#76L, rank#192] Window [rank(id#73L) windowspecdefinition(bucket#76L, id#73L ASC, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rank#82], [bucket#76L], [id#73L ASC] !+- Project [id#73L, bucket#76L, rank#82, rank#82 AS rank#192] +- Project [id#73L, (id#73L % 3) AS bucket#76L] ! +- Window [rank(id#73L) windowspecdefinition(bucket#76L, id#73L ASC, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rank#82], [bucket#76L], [id#73L ASC] +- Range (0, 10, step=1, splits=Some(8)) ! +- Project [id#73L, bucket#76L] ! +- Project [id#73L, (id#73L % cast(3 as bigint)) AS bucket#76L] ! +- Range (0, 10, step=1, splits=Some(8)) ...
=== [[example2]] Example 2
[source, scala]¶
// the business object case class Person(id: Long, name: String, city: String)
// the dataset to query over val dataset = Seq(Person(0, "Jacek", "Warsaw")).toDS
// the query // Note that we work with names only (out of 3 attributes in Person) val query = dataset.groupBy(upper('name) as 'name).count
scala> query.explain(extended = true) ... TRACE SparkOptimizer: === Applying Rule org.apache.spark.sql.catalyst.optimizer.ColumnPruning === Aggregate [upper(name#126)], [upper(name#126) AS name#160, count(1) AS count#166L] Aggregate [upper(name#126)], [upper(name#126) AS name#160, count(1) AS count#166L] !+- LocalRelation [id#125L, name#126, city#127] +- Project [name#126] ! +- LocalRelation [id#125L, name#126, city#127] ... == Parsed Logical Plan == 'Aggregate [upper('name) AS name#160], [upper('name) AS name#160, count(1) AS count#166L] +- LocalRelation [id#125L, name#126, city#127]
== Analyzed Logical Plan == name: string, count: bigint Aggregate [upper(name#126)], [upper(name#126) AS name#160, count(1) AS count#166L] +- LocalRelation [id#125L, name#126, city#127]
== Optimized Logical Plan == Aggregate [upper(name#126)], [upper(name#126) AS name#160, count(1) AS count#166L] +- LocalRelation [name#126]
== Physical Plan == *HashAggregate(keys=[upper(name#126)#171], functions=[count(1)], output=[name#160, count#166L]) +- Exchange hashpartitioning(upper(name#126)#171, 200) +- *HashAggregate(keys=[upper(name#126) AS upper(name#126)#171], functions=[partial_count(1)], output=[upper(name#126)#171, count#173L]) +- LocalTableScan [name#126]
Executing Rule¶
apply(plan: LogicalPlan): LogicalPlan
apply
...FIXME
apply
is part of the Rule abstraction.