Skip to content

LogicalPlanStats — Statistics Estimates and Query Hints of Logical Operators

LogicalPlanStats is an extension of the LogicalPlan abstraction to add Statistics for query planning (with or without cost-based optimization, e.g. CostBasedJoinReorder or JoinSelection, respectively).

Scala Definition

trait LogicalPlanStats { self: LogicalPlan =>
  // body omitted
}

LogicalPlanStats is a Scala trait with self: LogicalPlan as part of its definition. It is a very useful feature of Scala that restricts the set of classes that the trait could be used with (as well as makes the target subtype known at compile time).

Tip

Read up on self-type in Scala in the Tour of Scala.

Computing (and Caching) Statistics and Query Hints

stats: Statistics

stats gets the statistics from cache if computed already. If not, stats branches off per whether cost-based optimization is enabled or not and requests BasicStatsPlanVisitor or SizeInBytesOnlyStatsPlanVisitor for the statistics, respectively.

statsCache caches the statistics for later use.

stats is used when:

Invalidating Statistics Cache

invalidateStatsCache(): Unit

invalidateStatsCache clears the cache of the current logical operator and all of the children.

invalidateStatsCache is used when AdaptiveSparkPlanExec physical operator is requested to reOptimize.

Statistics Cache

statsCache: Option[Statistics]

statsCache holds the Statistics once computed (until invalidated).

Demo: Accessing Statistics

Use EXPLAIN COST SQL command to explain a query with the <>.

scala> sql("EXPLAIN COST SHOW TABLES").as[String].collect.foreach(println)
== Optimized Logical Plan ==
ShowTablesCommand false, Statistics(sizeInBytes=1.0 B, hints=none)

== Physical Plan ==
Execute ShowTablesCommand
   +- ShowTablesCommand false

The statistics of a logical plan directly using stats method or indirectly requesting QueryExecution for text representation with statistics.

val q = sql("SHOW TABLES")
scala> println(q.queryExecution.analyzed.stats)
Statistics(sizeInBytes=1.0 B, hints=none)

scala> println(q.queryExecution.stringWithStats)
== Optimized Logical Plan ==
ShowTablesCommand false, Statistics(sizeInBytes=1.0 B, hints=none)

== Physical Plan ==
Execute ShowTablesCommand
   +- ShowTablesCommand false
val names = Seq((1, "one"), (2, "two")).toDF("id", "name")

assert(spark.sessionState.conf.cboEnabled == false, "CBO should be turned off")

// CBO is disabled and so only sizeInBytes stat is available
// FIXME Why is analyzed required (not just logical)?
val namesStatsCboOff = names.queryExecution.analyzed.stats
scala> println(namesStatsCboOff)
Statistics(sizeInBytes=48.0 B, hints=none)

// Turn CBO on
import org.apache.spark.sql.internal.SQLConf
spark.sessionState.conf.setConf(SQLConf.CBO_ENABLED, true)

// Make sure that CBO is really enabled
scala> println(spark.sessionState.conf.cboEnabled)
true

// Invalidate the stats cache
names.queryExecution.analyzed.invalidateStatsCache

// Check out the statistics
val namesStatsCboOn = names.queryExecution.analyzed.stats
scala> println(namesStatsCboOn)
Statistics(sizeInBytes=48.0 B, hints=none)

// Despite CBO enabled, we can only get sizeInBytes stat
// That's because names is a LocalRelation under the covers
scala> println(names.queryExecution.optimizedPlan.numberedTreeString)
00 LocalRelation [id#5, name#6]

// LocalRelation triggers BasicStatsPlanVisitor to execute default case
// which is exactly as if we had CBO turned off

// Let's register names as a managed table
// That will change the rules of how stats are computed
import org.apache.spark.sql.SaveMode
names.write.mode(SaveMode.Overwrite).saveAsTable("names")

scala> spark.catalog.tableExists("names")
res5: Boolean = true

scala> spark.catalog.listTables.filter($"name" === "names").show
+-----+--------+-----------+---------+-----------+
| name|database|description|tableType|isTemporary|
+-----+--------+-----------+---------+-----------+
|names| default|       null|  MANAGED|      false|
+-----+--------+-----------+---------+-----------+

val namesTable = spark.table("names")

// names is a managed table now
// And Relation (not LocalRelation)
scala> println(namesTable.queryExecution.optimizedPlan.numberedTreeString)
00 Relation[id#32,name#33] parquet

// Check out the statistics
val namesStatsCboOn = namesTable.queryExecution.analyzed.stats
scala> println(namesStatsCboOn)
Statistics(sizeInBytes=1064.0 B, hints=none)

// Nothing has really changed, hasn't it?
// Well, sizeInBytes is bigger, but that's the only stat available
// row count stat requires ANALYZE TABLE with no NOSCAN option
sql("ANALYZE TABLE names COMPUTE STATISTICS")

// Invalidate the stats cache
namesTable.queryExecution.analyzed.invalidateStatsCache

// No change?! How so?
val namesStatsCboOn = namesTable.queryExecution.analyzed.stats
scala> println(namesStatsCboOn)
Statistics(sizeInBytes=1064.0 B, hints=none)

// Use optimized logical plan instead
val namesTableStats = spark.table("names").queryExecution.optimizedPlan.stats
scala> println(namesTableStats)
Statistics(sizeInBytes=64.0 B, rowCount=2, hints=none)