LogicalPlanStats — Statistics Estimates and Query Hints of Logical Operators¶
LogicalPlanStats
is an extension of the LogicalPlan abstraction to add Statistics for query planning (with or without cost-based optimization, e.g. CostBasedJoinReorder or JoinSelection, respectively).
Scala Definition¶
trait LogicalPlanStats { self: LogicalPlan =>
// body omitted
}
LogicalPlanStats
is a Scala trait with self: LogicalPlan
as part of its definition. It is a very useful feature of Scala that restricts the set of classes that the trait could be used with (as well as makes the target subtype known at compile time).
Tip
Read up on self-type
in Scala in the Tour of Scala.
Computing (and Caching) Statistics and Query Hints¶
stats: Statistics
stats
gets the statistics from cache if computed already. If not, stats
branches off per whether cost-based optimization is enabled or not and requests BasicStatsPlanVisitor or SizeInBytesOnlyStatsPlanVisitor for the statistics, respectively.
statsCache
caches the statistics for later use.
stats
is used when:
JoinSelection
execution planning strategy matches a logical plan:- that is small enough for broadcast join (using
BroadcastHashJoinExec
orBroadcastNestedLoopJoinExec
physical operators) - whose a single partition should be small enough to build a hash table (using
ShuffledHashJoinExec
physical operator) - that is much smaller (3X) than the other plan (for
ShuffledHashJoinExec
physical operator) - ...
QueryExecution
is requested for stringWithStats forEXPLAIN COST
SQL commandCacheManager
is requested to cache a Dataset or recacheByConditionHiveMetastoreCatalog
is requested forconvertToLogicalRelation
StarSchemaDetection
- CostBasedJoinReorder logical optimization is executed
Invalidating Statistics Cache¶
invalidateStatsCache(): Unit
invalidateStatsCache
clears the cache of the current logical operator and all of the children.
invalidateStatsCache
is used when AdaptiveSparkPlanExec physical operator is requested to reOptimize.
Statistics Cache¶
statsCache: Option[Statistics]
statsCache
holds the Statistics once computed (until invalidated).
Demo: Accessing Statistics¶
Use EXPLAIN COST
SQL command to explain a query with the <
scala> sql("EXPLAIN COST SHOW TABLES").as[String].collect.foreach(println)
== Optimized Logical Plan ==
ShowTablesCommand false, Statistics(sizeInBytes=1.0 B, hints=none)
== Physical Plan ==
Execute ShowTablesCommand
+- ShowTablesCommand false
The statistics of a logical plan directly using stats method or indirectly requesting QueryExecution
for text representation with statistics.
val q = sql("SHOW TABLES")
scala> println(q.queryExecution.analyzed.stats)
Statistics(sizeInBytes=1.0 B, hints=none)
scala> println(q.queryExecution.stringWithStats)
== Optimized Logical Plan ==
ShowTablesCommand false, Statistics(sizeInBytes=1.0 B, hints=none)
== Physical Plan ==
Execute ShowTablesCommand
+- ShowTablesCommand false
val names = Seq((1, "one"), (2, "two")).toDF("id", "name")
assert(spark.sessionState.conf.cboEnabled == false, "CBO should be turned off")
// CBO is disabled and so only sizeInBytes stat is available
// FIXME Why is analyzed required (not just logical)?
val namesStatsCboOff = names.queryExecution.analyzed.stats
scala> println(namesStatsCboOff)
Statistics(sizeInBytes=48.0 B, hints=none)
// Turn CBO on
import org.apache.spark.sql.internal.SQLConf
spark.sessionState.conf.setConf(SQLConf.CBO_ENABLED, true)
// Make sure that CBO is really enabled
scala> println(spark.sessionState.conf.cboEnabled)
true
// Invalidate the stats cache
names.queryExecution.analyzed.invalidateStatsCache
// Check out the statistics
val namesStatsCboOn = names.queryExecution.analyzed.stats
scala> println(namesStatsCboOn)
Statistics(sizeInBytes=48.0 B, hints=none)
// Despite CBO enabled, we can only get sizeInBytes stat
// That's because names is a LocalRelation under the covers
scala> println(names.queryExecution.optimizedPlan.numberedTreeString)
00 LocalRelation [id#5, name#6]
// LocalRelation triggers BasicStatsPlanVisitor to execute default case
// which is exactly as if we had CBO turned off
// Let's register names as a managed table
// That will change the rules of how stats are computed
import org.apache.spark.sql.SaveMode
names.write.mode(SaveMode.Overwrite).saveAsTable("names")
scala> spark.catalog.tableExists("names")
res5: Boolean = true
scala> spark.catalog.listTables.filter($"name" === "names").show
+-----+--------+-----------+---------+-----------+
| name|database|description|tableType|isTemporary|
+-----+--------+-----------+---------+-----------+
|names| default| null| MANAGED| false|
+-----+--------+-----------+---------+-----------+
val namesTable = spark.table("names")
// names is a managed table now
// And Relation (not LocalRelation)
scala> println(namesTable.queryExecution.optimizedPlan.numberedTreeString)
00 Relation[id#32,name#33] parquet
// Check out the statistics
val namesStatsCboOn = namesTable.queryExecution.analyzed.stats
scala> println(namesStatsCboOn)
Statistics(sizeInBytes=1064.0 B, hints=none)
// Nothing has really changed, hasn't it?
// Well, sizeInBytes is bigger, but that's the only stat available
// row count stat requires ANALYZE TABLE with no NOSCAN option
sql("ANALYZE TABLE names COMPUTE STATISTICS")
// Invalidate the stats cache
namesTable.queryExecution.analyzed.invalidateStatsCache
// No change?! How so?
val namesStatsCboOn = namesTable.queryExecution.analyzed.stats
scala> println(namesStatsCboOn)
Statistics(sizeInBytes=1064.0 B, hints=none)
// Use optimized logical plan instead
val namesTableStats = spark.table("names").queryExecution.optimizedPlan.stats
scala> println(namesTableStats)
Statistics(sizeInBytes=64.0 B, rowCount=2, hints=none)