Query¶
Query is a Statement.
Creating Instance¶
Query takes the following to be created:
-
NodeLocation -
Select -
Relation -
WindowExpression -
Expression -
GroupBy -
PartitionBy -
Expression -
RefinementInfo - pullQuery
- Limit
Query is created when:
EngineExecutoris requested to sourceTablePlanStatementRewriter.Rewriteris requested tovisitQueryAstBuilder.Visitoris requested to visitQuery
pullQuery Flag¶
Query is given a pullQuery flag when created (which is most importantly when AstBuilder.Visitor is requested to visitQuery).
AstBuilder.Visitor turns the pullQuery flag on (true) when the Query has no EMIT clause (and the buildingPersistentQuery internal flag is off).
isPullQuery¶
boolean isPullQuery()
isPullQuery is used when:
Analyzer.Visitoris requested to visitQueryQueryAnalyzeris requested to analyzeEngineExecutoris requested to executeTablePullQueryStatementRewriter.Rewriteris requested tovisitQuerySqlFormatter.Formatteris requested tovisitQueryQueryExecutoris requested to handleQueryScalablePushUtilis requested to isScalablePushQuery
accept¶
R accept(
AstVisitor<R, C> visitor,
C context)
accept is part of the AstNode abstraction.
accept requests the given AstVisitor to visit a Query.
Demo¶
Create KsqlEngine¶
Create KsqlEngine with bootstrap.servers configuration property (to let the Injector resolve a stream source for a query).
import io.confluent.ksql.util.KsqlConfig
import scala.jdk.CollectionConverters._
val props = Map("bootstrap.servers" -> ":9092").asJava
val ksqlConfig = new KsqlConfig(props)
val ksqlEngine = ???
Create Stream¶
val ksql = """
CREATE STREAM orders (
id bigint KEY,
item varchar)
WITH (
kafka_topic = 'orders_topic',
value_format = 'json',
partitions = 2);
"""
val statements = ksqlEngine.parse(ksql)
val parsed = statements.asScala.head
val prepared = ksqlEngine.prepare(parsed)
import io.confluent.ksql.statement.Injectors
val serviceContext = ksqlEngine.getServiceContext
val injector = Injectors.DEFAULT(ksqlEngine, serviceContext)
val executionOverrides = Map.empty[String, String].asJava;
import io.confluent.ksql.statement.ConfiguredStatement
import io.confluent.ksql.config.SessionConfig
val preconfigured = ConfiguredStatement.of(
prepared,
SessionConfig.of(ksqlConfig, executionOverrides))
val configured = injector.inject(preconfigured)
ksqlEngine.execute(serviceContext, configured)
Pull Query¶
val ksql = "SELECT * FROM orders;"
val statements = ksqlEngine.parse(ksql)
val parsed = statements.asScala.head
val prepared = ksqlEngine.prepare(parsed)
val preconfigured = ConfiguredStatement.of(
prepared,
SessionConfig.of(ksqlConfig, executionOverrides))
val configured = injector.inject(preconfigured)
import io.confluent.ksql.parser.tree.Query
val query = configured.getStatement.asInstanceOf[Query]
assert(query.isPullQuery)