SparkSqlAstBuilder — ANTLR-based SQL Parser¶
SparkSqlAstBuilder is an AstBuilder that converts SQL statements into Catalyst expressions, logical plans or table identifiers (using visit callbacks).
Creating Instance¶
SparkSqlAstBuilder takes the following to be created:
SparkSqlAstBuilder is created for SparkSqlParser (which happens when SparkSession is requested for SessionState).

expr Standard Function
SparkSqlAstBuilder can also be temporarily created for expr standard function (to create column expressions).
val c = expr("from_json(value, schema)")
scala> :type c
org.apache.spark.sql.Column
scala> :type c.expr
org.apache.spark.sql.catalyst.expressions.Expression
scala> println(c.expr.numberedTreeString)
00 'from_json('value, 'schema)
01 :- 'value
02 +- 'schema
Accessing SparkSqlAstBuilder¶
scala> :type spark.sessionState.sqlParser
org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.execution.SparkSqlParser
val sqlParser = spark.sessionState.sqlParser.asInstanceOf[SparkSqlParser]
scala> :type sqlParser.astBuilder
org.apache.spark.sql.execution.SparkSqlAstBuilder
Visit Callbacks¶
visitAnalyze¶
Creates AnalyzeColumnCommand, AnalyzePartitionCommand or AnalyzeTableCommand logical commands.
ANTLR labeled alternative: #analyze
NOSCAN Identifier
visitAnalyze supports NOSCAN identifier only (and reports a ParseException if not used).
NOSCAN is used for AnalyzePartitionCommand and AnalyzeTableCommand logical commands only.
AnalyzeColumnCommand¶
AnalyzeColumnCommand logical command for ANALYZE TABLE with FOR COLUMNS clause (but no PARTITION specification)
// Seq((0, 0, "zero"), (1, 1, "one")).toDF("id", "p1", "p2").write.partitionBy("p1", "p2").saveAsTable("t1")
val sqlText = "ANALYZE TABLE t1 COMPUTE STATISTICS FOR COLUMNS id, p1"
val plan = spark.sql(sqlText).queryExecution.logical
import org.apache.spark.sql.execution.command.AnalyzeColumnCommand
val cmd = plan.asInstanceOf[AnalyzeColumnCommand]
scala> println(cmd)
AnalyzeColumnCommand `t1`, [id, p1]
AnalyzePartitionCommand¶
AnalyzePartitionCommand logical command for ANALYZE TABLE with PARTITION specification (but no FOR COLUMNS clause)
// Seq((0, 0, "zero"), (1, 1, "one")).toDF("id", "p1", "p2").write.partitionBy("p1", "p2").saveAsTable("t1")
val analyzeTable = "ANALYZE TABLE t1 PARTITION (p1, p2) COMPUTE STATISTICS"
val plan = spark.sql(analyzeTable).queryExecution.logical
import org.apache.spark.sql.execution.command.AnalyzePartitionCommand
val cmd = plan.asInstanceOf[AnalyzePartitionCommand]
scala> println(cmd)
AnalyzePartitionCommand `t1`, Map(p1 -> None, p2 -> None), false
AnalyzeTableCommand¶
AnalyzeTableCommand logical command for ANALYZE TABLE with neither PARTITION specification nor FOR COLUMNS clause
// Seq((0, 0, "zero"), (1, 1, "one")).toDF("id", "p1", "p2").write.partitionBy("p1", "p2").saveAsTable("t1")
val sqlText = "ANALYZE TABLE t1 COMPUTE STATISTICS NOSCAN"
val plan = spark.sql(sqlText).queryExecution.logical
import org.apache.spark.sql.execution.command.AnalyzeTableCommand
val cmd = plan.asInstanceOf[AnalyzeTableCommand]
scala> println(cmd)
AnalyzeTableCommand `t1`, false
visitGenericFileFormat¶
Creates a CatalogStorageFormat with the Hive SerDe for the data source name that can be one of the following (with their Hive-supported variants):
sequencefilercfileorcparquettextfileavro
CACHE TABLE¶
Creates a CacheTableCommand logical command for CACHE [LAZY] TABLE [table] (AS? [query])?
ANTLR labeled alternative: #cacheTable
visitCreateHiveTable¶
Creates a CreateTable
ANTLR labeled alternative: #createHiveTable
CREATE FLOW AS INSERT INTO BY NAME¶
Creates an append (insert into) flow (a CreateFlowCommand logical operator) for CREATE FLOW SQL statement
CREATE FLOW [ flow_name ]
AS INSERT INTO [ destination_name ] BY NAME
ANTLR labeled alternative: #createPipelineInsertIntoFlow
CREATE TABLE¶
Creates a CreateTempViewUsing logical operator for CREATE TEMPORARY VIEW USING or falls back to AstBuilder (to create either a CreateTableAsSelect or a CreateTable)
CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] identifierReference
[(createOrReplaceTableColTypeList)]
[USING multipartIdentifier]
createTableClauses
[[AS] query]
ANTLR labeled alternative: #createTable
Create Pipeline Dataset¶
Creates the following logical operators for Spark Declarative Pipelines framework:
| Logical Operator | SQL Statement |
|---|---|
| CreateMaterializedViewAsSelect | CREATE MATERIALIZED VIEW |
| CreateStreamingTableAsSelect | CREATE STREAMING TABLE ... AS |
| CreateStreamingTable | CREATE STREAMING TABLE |
CREATE MATERIALIZED VIEW (IF NOT EXISTS)? [ materialized_view_identifier ]
[ column_definitions ]?
[ create_table_options ]?
AS [ query ]
CREATE STREAMING TABLE (IF NOT EXISTS)? [ streaming_table_identifier ]
[ column_definitions ]?
[ create_table_options ]?
(AS [ query ])?
CREATE VIEW¶
Creates either a CreateViewCommand or a CreateView logical operator for CREATE VIEW SQL statement, for temporary or persisted views, respectively
CREATE [OR REPLACE] [[GLOBAL] TEMPORARY] VIEW [IF NOT EXISTS] view_name
[column_alias [COMMENT column_comment] (, column_alias [COMMENT column_comment])*]
[COMMENT view_comment]
[PARTITIONED ON identifierList]
[TBLPROPERTIES propertyList]
AS query
ANTLR labeled alternative: #createView
CREATE TEMPORARY VIEW USING¶
Creates a CreateTempViewUsing for CREATE TEMPORARY VIEW USING SQL statement
ANTLR labeled alternative: #createTempViewUsing
visitDescribeTable¶
Creates DescribeColumnCommand or DescribeTableCommand logical commands.
ANTLR labeled alternative: #describeTable
DescribeColumnCommand¶
DescribeColumnCommand logical command for DESCRIBE TABLE with a single column only (i.e. no PARTITION specification).
// Seq((0, 0, "zero"), (1, 1, "one")).toDF("id", "p1", "p2").write.partitionBy("p1", "p2").saveAsTable("t1")
val sqlCmd = "DESC EXTENDED t1 p1"
val plan = spark.sql(sqlCmd).queryExecution.logical
import org.apache.spark.sql.execution.command.DescribeColumnCommand
val cmd = plan.asInstanceOf[DescribeColumnCommand]
scala> println(cmd)
DescribeColumnCommand `t1`, [p1], true
DescribeTableCommand¶
DescribeTableCommand logical command for all other variants of DESCRIBE TABLE (i.e. no column)
// Seq((0, 0, "zero"), (1, 1, "one")).toDF("id", "p1", "p2").write.partitionBy("p1", "p2").saveAsTable("t1")
val sqlCmd = "DESC t1"
val plan = spark.sql(sqlCmd).queryExecution.logical
import org.apache.spark.sql.execution.command.DescribeTableCommand
val cmd = plan.asInstanceOf[DescribeTableCommand]
scala> println(cmd)
DescribeTableCommand `t1`, false
EXPLAIN¶
Creates an ExplainCommand logical command for the following:
EXPLAIN (LOGICAL | FORMATTED | EXTENDED | CODEGEN | COST)?
statement
Operation not allowed: EXPLAIN LOGICAL
EXPLAIN LOGICAL is currently not supported.
ANTLR labeled alternative: #explain
SHOW CREATE TABLE¶
Creates ShowCreateTableCommand logical command for SHOW CREATE TABLE SQL statement.
SHOW CREATE TABLE tableIdentifier
ANTLR labeled alternative: #showCreateTable
TRUNCATE TABLE¶
Creates TruncateTableCommand logical command for TRUNCATE TABLE SQL statement.
TRUNCATE TABLE tablename [PARTITION (partcol1=val1, partcol2=val2 ...)]
ANTLR labeled alternative: #truncateTable
withRepartitionByExpression¶
withRepartitionByExpression(
ctx: QueryOrganizationContext,
expressions: Seq[Expression],
query: LogicalPlan): LogicalPlan
withRepartitionByExpression creates a RepartitionByExpression logical operator (with undefined number of partitions).
withRepartitionByExpression is part of AstBuilder abstraction.