Skip to content

AnalyzeColumnCommand Logical Command

AnalyzeColumnCommand is a logical command to represent AnalyzeColumn logical operator.

AnalyzeColumnCommand is not supported on views (unless they are cached).

Creating Instance

AnalyzeColumnCommand takes the following to be created:

  • Table
  • Column Names
  • allColumns Flag

AnalyzeColumnCommand is created when:

Executing Command

RunnableCommand
run(
  sparkSession: SparkSession): Seq[Row]

run is part of RunnableCommand abstraction.

run calculates the following statistics:

  • sizeInBytes
  • stats for each column

Computing Statistics for Specified Columns

computeColumnStats(
  sparkSession: SparkSession,
  tableIdent: TableIdentifier,
  columnNames: Seq[String]): (Long, Map[String, ColumnStat])

computeColumnStats...FIXME

Computing Percentiles

computePercentiles(
  attributesToAnalyze: Seq[Attribute],
  sparkSession: SparkSession,
  relation: LogicalPlan): AttributeMap[ArrayData]

computePercentiles...FIXME

analyzeColumnInCatalog

analyzeColumnInCatalog(
  sparkSession: SparkSession): Unit

analyzeColumnInCatalog requests the SessionCatalog for getTableMetadata of the table.

For VIEW catalog tables, analyzeColumnInCatalog analyzes the columnNames if it's a cached view (or throws an AnalysisException).

For EXTERNAL and MANAGED catalog tables, analyzeColumnInCatalog getColumnsToAnalyze for the columnNames.

analyzeColumnInCatalog computeColumnStats for the columnNames.

analyzeColumnInCatalog converts the column stats to CatalogColumnStat.

analyzeColumnInCatalog creates a CatalogStatistics with the following:

Property Value
sizeInBytes calculateTotalSize
rowCount computeColumnStats
colStats CatalogStatistics with the new CatalogColumnStats applied

In the end, analyzeColumnInCatalog requests the SessionCatalog to alter the table with the new CatalogStatistics.

AnalysisException

analyzeColumnInCatalog throws the following AnalysisException unless the catalog view is cached:

ANALYZE TABLE is not supported on views.

Demo

AnalyzeColumnCommand can generate column histograms when spark.sql.statistics.histogram.enabled configuration property is enabled. AnalyzeColumnCommand supports column histograms for the following data types:

  • IntegralType
  • DecimalType
  • DoubleType
  • FloatType
  • DateType
  • TimestampType
// ./bin/spark-shell --conf spark.sql.statistics.histogram.enabled=true
// Use the above example to set up the environment
// Make sure that ANALYZE TABLE COMPUTE STATISTICS FOR COLUMNS was run with histogram enabled

// There are 254 bins by default
// Use spark.sql.statistics.histogram.numBins to control the bins
val descExtSQL = s"DESC EXTENDED $tableName p1"
scala> spark.sql(descExtSQL).show(truncate = false)
+--------------+-----------------------------------------------------+
|info_name     |info_value                                           |
+--------------+-----------------------------------------------------+
|col_name      |p1                                                   |
|data_type     |double                                               |
|comment       |NULL                                                 |
|min           |0.0                                                  |
|max           |1.4                                                  |
|num_nulls     |0                                                    |
|distinct_count|2                                                    |
|avg_col_len   |8                                                    |
|max_col_len   |8                                                    |
|histogram     |height: 0.007874015748031496, num_of_bins: 254       |
|bin_0         |lower_bound: 0.0, upper_bound: 0.0, distinct_count: 1|
|bin_1         |lower_bound: 0.0, upper_bound: 0.0, distinct_count: 1|
|bin_2         |lower_bound: 0.0, upper_bound: 0.0, distinct_count: 1|
|bin_3         |lower_bound: 0.0, upper_bound: 0.0, distinct_count: 1|
|bin_4         |lower_bound: 0.0, upper_bound: 0.0, distinct_count: 1|
|bin_5         |lower_bound: 0.0, upper_bound: 0.0, distinct_count: 1|
|bin_6         |lower_bound: 0.0, upper_bound: 0.0, distinct_count: 1|
|bin_7         |lower_bound: 0.0, upper_bound: 0.0, distinct_count: 1|
|bin_8         |lower_bound: 0.0, upper_bound: 0.0, distinct_count: 1|
|bin_9         |lower_bound: 0.0, upper_bound: 0.0, distinct_count: 1|
+--------------+-----------------------------------------------------+
only showing top 20 rows

Demo

// Make the example reproducible
val tableName = "t1"
import org.apache.spark.sql.catalyst.TableIdentifier
val tableId = TableIdentifier(tableName)

val sessionCatalog = spark.sessionState.catalog
sessionCatalog.dropTable(tableId, ignoreIfNotExists = true, purge = true)

val df = Seq((0, 0.0, "zero"), (1, 1.4, "one")).toDF("id", "p1", "p2")
df.write.saveAsTable("t1")

// AnalyzeColumnCommand represents ANALYZE TABLE...FOR COLUMNS SQL command
val allCols = df.columns.mkString(",")
val analyzeTableSQL = s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS $allCols"
val plan = spark.sql(analyzeTableSQL).queryExecution.logical
import org.apache.spark.sql.execution.command.AnalyzeColumnCommand
val cmd = plan.asInstanceOf[AnalyzeColumnCommand]
scala> println(cmd)
AnalyzeColumnCommand `t1`, [id, p1, p2]

spark.sql(analyzeTableSQL)
val stats = sessionCatalog.getTableMetadata(tableId).stats.get
scala> println(stats.simpleString)
1421 bytes, 2 rows

scala> stats.colStats.map { case (c, ss) => s"$c: $ss" }.foreach(println)
id: ColumnStat(2,Some(0),Some(1),0,4,4,None)
p1: ColumnStat(2,Some(0.0),Some(1.4),0,8,8,None)
p2: ColumnStat(2,None,None,0,4,4,None)

// Use DESC EXTENDED for friendlier output
scala> sql(s"DESC EXTENDED $tableName id").show
+--------------+----------+
|     info_name|info_value|
+--------------+----------+
|      col_name|        id|
|     data_type|       int|
|       comment|      NULL|
|           min|         0|
|           max|         1|
|     num_nulls|         0|
|distinct_count|         2|
|   avg_col_len|         4|
|   max_col_len|         4|
|     histogram|      NULL|
+--------------+----------+