JoinEstimation¶
JoinEstimation is a utility that <
JoinEstimation is <BasicStatsPlanVisitor to estimate statistics of a Join logical operator.
[[creating-instance]] [[join]] JoinEstimation takes a Join.md[Join] logical operator when created.
[[leftStats]] [[rightStats]] When <JoinEstimation immediately takes the estimated statistics and query hints of the Join.md#left[left] and Join.md#right[right] sides of the <
[source, scala]¶
// JoinEstimation requires row count stats for join statistics estimates // With cost-based optimization off, size in bytes is available only // That would give no join estimates whatsoever (except size in bytes) // Make sure that you --conf spark.sql.cbo.enabled=true scala> println(spark.sessionState.conf.cboEnabled) true
// Build a query with join operator // From the available data sources tables seem the best...so far val r1 = spark.range(5) scala> println(r1.queryExecution.analyzed.stats.simpleString) sizeInBytes=40.0 B, hints=none
// Make the demo reproducible val db = spark.catalog.currentDatabase spark.sharedState.externalCatalog.dropTable(db, table = "t1", ignoreIfNotExists = true, purge = true) spark.sharedState.externalCatalog.dropTable(db, table = "t2", ignoreIfNotExists = true, purge = true)
// FIXME What relations give row count stats?
// Register tables spark.range(5).write.saveAsTable("t1") spark.range(10).write.saveAsTable("t2")
// Refresh internal registries sql("REFRESH TABLE t1") sql("REFRESH TABLE t2")
// Calculate row count stats val tables = Seq("t1", "t2") tables.map(t => s"ANALYZE TABLE $t COMPUTE STATISTICS").foreach(sql)
val t1 = spark.table("t1") val t2 = spark.table("t2")
// analyzed plan is just before withCachedData and optimizedPlan plans // where CostBasedJoinReorder kicks in and optimizes a query using statistics
val t1plan = t1.queryExecution.analyzed scala> println(t1plan.numberedTreeString) 00 SubqueryAlias t1 01 +- Relation[id#45L] parquet
// Show the stats of every node in the analyzed query plan
val p0 = t1plan.p(0) scala> println(s"Statistics of ${p0.simpleString}: ${p0.stats.simpleString}") Statistics of SubqueryAlias t1: sizeInBytes=80.0 B, hints=none
val p1 = t1plan.p(1) scala> println(s"Statistics of ${p1.simpleString}: ${p1.stats.simpleString}") Statistics of Relation[id#45L] parquet: sizeInBytes=80.0 B, rowCount=5, hints=none
val t2plan = t2.queryExecution.analyzed
// let's get rid of the SubqueryAlias operator
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases val t1NoAliasesPlan = EliminateSubqueryAliases(t1plan) val t2NoAliasesPlan = EliminateSubqueryAliases(t2plan)
// Using Catalyst DSL import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.plans._ val plan = t1NoAliasesPlan.join( otherPlan = t2NoAliasesPlan, joinType = Inner, condition = Some($"id".expr)) scala> println(plan.numberedTreeString) 00 'Join Inner, 'id 01 :- Relation[id#45L] parquet 02 +- Relation[id#57L] parquet
// Take Join operator off the logical plan // JoinEstimation works with Joins only import org.apache.spark.sql.catalyst.plans.logical.Join val join = plan.collect { case j: Join => j }.head
// Make sure that row count stats are defined per join side scala> join.left.stats.rowCount.isDefined res1: Boolean = true
scala> join.right.stats.rowCount.isDefined res2: Boolean = true
// Make the example reproducible // Computing stats is once-only process and the estimates are cached join.invalidateStatsCache
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.JoinEstimation val stats = JoinEstimation(join).estimate scala> :type stats Option[org.apache.spark.sql.catalyst.plans.logical.Statistics]
// Stats have to be available so Option.get should just work scala> println(stats.get.simpleString) Some(sizeInBytes=1200.0 B, rowCount=50, hints=none)
JoinEstimation can <
Inner,Cross,LeftOuter,RightOuter,FullOuter,LeftSemiandLeftAnti
For the other join types (e.g. ExistenceJoin), JoinEstimation prints out a DEBUG message to the logs and returns None (to "announce" that no statistics could be computed).
[source, scala]¶
// Demo: Unsupported join type, i.e. ExistenceJoin
// Some parts were copied from the earlier demo // FIXME Make it self-contained
// Using Catalyst DSL // Don't even know if such existance join could ever be possible in Spark SQL // For demo purposes it's OK, isn't it? import org.apache.spark.sql.catalyst.plans.ExistenceJoin val left = t1NoAliasesPlan val right = t2NoAliasesPlan val plan = left.join(right, joinType = ExistenceJoin(exists = 'id.long))
// Take Join operator off the logical plan // JoinEstimation works with Joins only import org.apache.spark.sql.catalyst.plans.logical.Join val join = plan.collect { case j: Join => j }.head
// Enable DEBUG logging level import org.apache.log4j.{Level, Logger} Logger.getLogger("org.apache.spark.sql.catalyst.plans.logical.statsEstimation.JoinEstimation").setLevel(Level.DEBUG)
scala> val stats = JoinEstimation(join).estimate 18/06/13 10:29:37 DEBUG JoinEstimation: [CBO] Unsupported join type: ExistenceJoin(id#35L) stats: Option[org.apache.spark.sql.catalyst.plans.logical.Statistics] = None
[source, scala]¶
// FIXME Describe the purpose of the demo
// Using Catalyst DSL import org.apache.spark.sql.catalyst.dsl.plans._
val t1 = table(ref = "t1")
// HACK: Disable symbolToColumn implicit conversion // It is imported automatically in spark-shell (and makes demos impossible) // implicit def symbolToColumn(s: Symbol): org.apache.spark.sql.ColumnName trait ThatWasABadIdea implicit def symbolToColumn(ack: ThatWasABadIdea) = ack
import org.apache.spark.sql.catalyst.dsl.expressions._ val id = 'id.long
val t2 = table("t2") import org.apache.spark.sql.catalyst.plans.LeftSemi val plan = t1.join(t2, joinType = LeftSemi, condition = Some(id)) scala> println(plan.numberedTreeString) 00 'Join LeftSemi, id#2: bigint 01 :- 'UnresolvedRelation t1 02 +- 'UnresolvedRelation t2
import org.apache.spark.sql.catalyst.plans.logical.Join val join = plan match { case j: Join => j }
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.JoinEstimation
// FIXME java.lang.UnsupportedOperationException val stats = JoinEstimation(join).estimate
[[logging]] [TIP] ==== Enable DEBUG logging level for org.apache.spark.sql.catalyst.plans.logical.statsEstimation.JoinEstimation logger to see what happens inside.
Add the following line to conf/log4j2.properties:
log4j.logger.org.apache.spark.sql.catalyst.plans.logical.statsEstimation.JoinEstimation=DEBUG
Refer to spark-logging.md[Logging].¶
=== [[estimateInnerOuterJoin]] estimateInnerOuterJoin Internal Method
[source, scala]¶
estimateInnerOuterJoin(): Option[Statistics]¶
estimateInnerOuterJoin destructures <
estimateInnerOuterJoin simply returns None (i.e. nothing) when either side of the <
NOTE: estimateInnerOuterJoin is used exclusively when JoinEstimation is requested to <Inner, Cross, LeftOuter, RightOuter and FullOuter joins.
=== [[computeByNdv]] computeByNdv Internal Method
[source, scala]¶
computeByNdv( leftKey: AttributeReference, rightKey: AttributeReference, newMin: Option[Any], newMax: Option[Any]): (BigInt, ColumnStat)
computeByNdv...FIXME
NOTE: computeByNdv is used exclusively when JoinEstimation is requested for <
=== [[computeCardinalityAndStats]] computeCardinalityAndStats Internal Method
[source, scala]¶
computeCardinalityAndStats( keyPairs: Seq[(AttributeReference, AttributeReference)]): (BigInt, AttributeMap[ColumnStat])
computeCardinalityAndStats...FIXME
NOTE: computeCardinalityAndStats is used exclusively when JoinEstimation is requested for <
=== [[computeByHistogram]] Computing Join Cardinality Using Equi-Height Histograms -- computeByHistogram Internal Method
[source, scala]¶
computeByHistogram( leftKey: AttributeReference, rightKey: AttributeReference, leftHistogram: Histogram, rightHistogram: Histogram, newMin: Option[Any], newMax: Option[Any]): (BigInt, ColumnStat)
computeByHistogram...FIXME
NOTE: computeByHistogram is used exclusively when JoinEstimation is requested for <
=== [[estimateLeftSemiAntiJoin]] Estimating Statistics for Left Semi and Left Anti Joins -- estimateLeftSemiAntiJoin Internal Method
[source, scala]¶
estimateLeftSemiAntiJoin(): Option[Statistics]¶
estimateLeftSemiAntiJoin estimates statistics of the <estimateLeftSemiAntiJoin simply returns None (i.e. no statistics estimated).
NOTE: cost-based-optimization/index.md#rowCount[row count] statistic of a table is available only after cost-based-optimization/index.md#ANALYZE-TABLE[ANALYZE TABLE COMPUTE STATISTICS] SQL command.
If available, estimateLeftSemiAntiJoin takes the estimated row count statistic of the Join.md#left[left side] of the <
NOTE: Use cost-based-optimization/index.md#ANALYZE-TABLE[ANALYZE TABLE COMPUTE STATISTICS] SQL command on the left logical plan to compute cost-based-optimization/index.md#rowCount[row count] statistics.
NOTE: Use cost-based-optimization/index.md#ANALYZE-TABLE[ANALYZE TABLE COMPUTE STATISTICS FOR COLUMNS] SQL command on the left logical plan to generate column (equi-height) histograms for more accurate estimations.
In the end, estimateLeftSemiAntiJoin creates a new Statistics with the following estimates:
. Total size (in bytes) is the output size for the output schema of the join, the row count statistic (aka output rows) and column histograms.
. Row count is exactly the row count of the left side
. Column histograms is exactly the column histograms of the left side
estimateLeftSemiAntiJoin is used when JoinEstimation is requested to <LeftSemi and LeftAnti joins.
=== [[estimate]] Estimating Statistics and Query Hints of Join Logical Operator -- estimate Method
[source, scala]¶
estimate: Option[Statistics]¶
estimate estimates statistics and query hints of the <
-
For
Inner,Cross,LeftOuter,RightOuterandFullOuterjoin types,estimate<> -
For
LeftSemiandLeftAntijoin types,estimate<>
For other join types, estimate prints out the following DEBUG message to the logs and returns None (to "announce" that no statistics could be computed).
[CBO] Unsupported join type: [joinType]
NOTE: estimate is used exclusively when BasicStatsPlanVisitor is requested to estimate statistics and query hints of a Join logical operator.