Query¶
Query
is a Statement.
Creating Instance¶
Query
takes the following to be created:
-
NodeLocation
-
Select
-
Relation
-
WindowExpression
-
Expression
-
GroupBy
-
PartitionBy
-
Expression
-
RefinementInfo
- pullQuery
- Limit
Query
is created when:
EngineExecutor
is requested to sourceTablePlanStatementRewriter.Rewriter
is requested tovisitQuery
AstBuilder.Visitor
is requested to visitQuery
pullQuery Flag¶
Query
is given a pullQuery
flag when created (which is most importantly when AstBuilder.Visitor
is requested to visitQuery).
AstBuilder.Visitor
turns the pullQuery
flag on (true
) when the Query
has no EMIT
clause (and the buildingPersistentQuery internal flag is off).
isPullQuery¶
boolean isPullQuery()
isPullQuery
is used when:
Analyzer.Visitor
is requested to visitQueryQueryAnalyzer
is requested to analyzeEngineExecutor
is requested to executeTablePullQueryStatementRewriter.Rewriter
is requested tovisitQuery
SqlFormatter.Formatter
is requested tovisitQuery
QueryExecutor
is requested to handleQueryScalablePushUtil
is requested to isScalablePushQuery
accept¶
R accept(
AstVisitor<R, C> visitor,
C context)
accept
is part of the AstNode abstraction.
accept
requests the given AstVisitor to visit a Query.
Demo¶
Create KsqlEngine¶
Create KsqlEngine with bootstrap.servers
configuration property (to let the Injector resolve a stream source for a query).
import io.confluent.ksql.util.KsqlConfig
import scala.jdk.CollectionConverters._
val props = Map("bootstrap.servers" -> ":9092").asJava
val ksqlConfig = new KsqlConfig(props)
val ksqlEngine = ???
Create Stream¶
val ksql = """
CREATE STREAM orders (
id bigint KEY,
item varchar)
WITH (
kafka_topic = 'orders_topic',
value_format = 'json',
partitions = 2);
"""
val statements = ksqlEngine.parse(ksql)
val parsed = statements.asScala.head
val prepared = ksqlEngine.prepare(parsed)
import io.confluent.ksql.statement.Injectors
val serviceContext = ksqlEngine.getServiceContext
val injector = Injectors.DEFAULT(ksqlEngine, serviceContext)
val executionOverrides = Map.empty[String, String].asJava;
import io.confluent.ksql.statement.ConfiguredStatement
import io.confluent.ksql.config.SessionConfig
val preconfigured = ConfiguredStatement.of(
prepared,
SessionConfig.of(ksqlConfig, executionOverrides))
val configured = injector.inject(preconfigured)
ksqlEngine.execute(serviceContext, configured)
Pull Query¶
val ksql = "SELECT * FROM orders;"
val statements = ksqlEngine.parse(ksql)
val parsed = statements.asScala.head
val prepared = ksqlEngine.prepare(parsed)
val preconfigured = ConfiguredStatement.of(
prepared,
SessionConfig.of(ksqlConfig, executionOverrides))
val configured = injector.inject(preconfigured)
import io.confluent.ksql.parser.tree.Query
val query = configured.getStatement.asInstanceOf[Query]
assert(query.isPullQuery)