Skip to content

ColumnPruning Logical Optimization

ColumnPruning is a base logical optimization that <>.

ColumnPruning is part of the RewriteSubquery once-executed batch in the standard batches of the Logical Optimizer.

ColumnPruning is simply a <> for transforming <>, i.e. Rule[LogicalPlan].

=== [[example1]] Example 1

[source, scala]

val dataset = spark.range(10).withColumn("bucket", 'id % 3)

import org.apache.spark.sql.expressions.Window val rankCol = rank over Window.partitionBy('bucket).orderBy('id) as "rank"

val ranked = dataset.withColumn("rank", rankCol)

scala> ranked.explain(true) ... TRACE SparkOptimizer: === Applying Rule org.apache.spark.sql.catalyst.optimizer.ColumnPruning === Project [id#73L, bucket#76L, rank#192] Project [id#73L, bucket#76L, rank#192] !+- Project [id#73L, bucket#76L, rank#82, rank#82 AS rank#192] +- Project [id#73L, bucket#76L, rank#82 AS rank#192] +- Window [rank(id#73L) windowspecdefinition(bucket#76L, id#73L ASC, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rank#82], [bucket#76L], [id#73L ASC] +- Window [rank(id#73L) windowspecdefinition(bucket#76L, id#73L ASC, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rank#82], [bucket#76L], [id#73L ASC] ! +- Project [id#73L, bucket#76L] +- Project [id#73L, (id#73L % cast(3 as bigint)) AS bucket#76L] ! +- Project [id#73L, (id#73L % cast(3 as bigint)) AS bucket#76L] +- Range (0, 10, step=1, splits=Some(8)) ! +- Range (0, 10, step=1, splits=Some(8)) ... TRACE SparkOptimizer: Fixed point reached for batch Operator Optimizations after 2 iterations. DEBUG SparkOptimizer: === Result of Batch Operator Optimizations === !Project [id#73L, bucket#76L, rank#192] Window [rank(id#73L) windowspecdefinition(bucket#76L, id#73L ASC, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rank#82], [bucket#76L], [id#73L ASC] !+- Project [id#73L, bucket#76L, rank#82, rank#82 AS rank#192] +- Project [id#73L, (id#73L % 3) AS bucket#76L] ! +- Window [rank(id#73L) windowspecdefinition(bucket#76L, id#73L ASC, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rank#82], [bucket#76L], [id#73L ASC] +- Range (0, 10, step=1, splits=Some(8)) ! +- Project [id#73L, bucket#76L] ! +- Project [id#73L, (id#73L % cast(3 as bigint)) AS bucket#76L] ! +- Range (0, 10, step=1, splits=Some(8)) ...


=== [[example2]] Example 2

[source, scala]

// the business object case class Person(id: Long, name: String, city: String)

// the dataset to query over val dataset = Seq(Person(0, "Jacek", "Warsaw")).toDS

// the query // Note that we work with names only (out of 3 attributes in Person) val query = dataset.groupBy(upper('name) as 'name).count

scala> query.explain(extended = true) ... TRACE SparkOptimizer: === Applying Rule org.apache.spark.sql.catalyst.optimizer.ColumnPruning === Aggregate [upper(name#126)], [upper(name#126) AS name#160, count(1) AS count#166L] Aggregate [upper(name#126)], [upper(name#126) AS name#160, count(1) AS count#166L] !+- LocalRelation [id#125L, name#126, city#127] +- Project [name#126] ! +- LocalRelation [id#125L, name#126, city#127] ... == Parsed Logical Plan == 'Aggregate [upper('name) AS name#160], [upper('name) AS name#160, count(1) AS count#166L] +- LocalRelation [id#125L, name#126, city#127]

== Analyzed Logical Plan == name: string, count: bigint Aggregate [upper(name#126)], [upper(name#126) AS name#160, count(1) AS count#166L] +- LocalRelation [id#125L, name#126, city#127]

== Optimized Logical Plan == Aggregate [upper(name#126)], [upper(name#126) AS name#160, count(1) AS count#166L] +- LocalRelation [name#126]

== Physical Plan == *HashAggregate(keys=[upper(name#126)#171], functions=[count(1)], output=[name#160, count#166L]) +- Exchange hashpartitioning(upper(name#126)#171, 200) +- *HashAggregate(keys=[upper(name#126) AS upper(name#126)#171], functions=[partial_count(1)], output=[upper(name#126)#171, count#173L]) +- LocalTableScan [name#126]


Executing Rule

apply(plan: LogicalPlan): LogicalPlan

apply...FIXME

apply is part of the Rule abstraction.