Skip to content

Query

Query is a Statement.

Creating Instance

Query takes the following to be created:

  • NodeLocation
  • Select
  • Relation
  • WindowExpression
  • Expression
  • GroupBy
  • PartitionBy
  • Expression
  • RefinementInfo
  • pullQuery
  • Limit

Query is created when:

  • EngineExecutor is requested to sourceTablePlan
  • StatementRewriter.Rewriter is requested to visitQuery
  • AstBuilder.Visitor is requested to visitQuery

pullQuery Flag

Query is given a pullQuery flag when created (which is most importantly when AstBuilder.Visitor is requested to visitQuery).

AstBuilder.Visitor turns the pullQuery flag on (true) when the Query has no EMIT clause (and the buildingPersistentQuery internal flag is off).

isPullQuery

boolean isPullQuery()

isPullQuery is used when:

accept

R accept(
  AstVisitor<R, C> visitor,
  C context)

accept is part of the AstNode abstraction.


accept requests the given AstVisitor to visit a Query.

Demo

Create KsqlEngine

Create KsqlEngine with bootstrap.servers configuration property (to let the Injector resolve a stream source for a query).

import io.confluent.ksql.util.KsqlConfig
import scala.jdk.CollectionConverters._
val props = Map("bootstrap.servers" -> ":9092").asJava
val ksqlConfig = new KsqlConfig(props)

val ksqlEngine = ???

Create Stream

val ksql = """
  CREATE STREAM orders (
    id bigint KEY,
    item varchar)
  WITH (
    kafka_topic = 'orders_topic',
    value_format = 'json',
    partitions = 2);
"""

val statements = ksqlEngine.parse(ksql)
val parsed = statements.asScala.head
val prepared = ksqlEngine.prepare(parsed)
import io.confluent.ksql.statement.Injectors
val serviceContext = ksqlEngine.getServiceContext
val injector = Injectors.DEFAULT(ksqlEngine, serviceContext)
val executionOverrides = Map.empty[String, String].asJava;

import io.confluent.ksql.statement.ConfiguredStatement
import io.confluent.ksql.config.SessionConfig
val preconfigured = ConfiguredStatement.of(
  prepared,
  SessionConfig.of(ksqlConfig, executionOverrides))
val configured = injector.inject(preconfigured)

ksqlEngine.execute(serviceContext, configured)

Pull Query

val ksql = "SELECT * FROM orders;"

val statements = ksqlEngine.parse(ksql)
val parsed = statements.asScala.head
val prepared = ksqlEngine.prepare(parsed)

val preconfigured = ConfiguredStatement.of(
  prepared,
  SessionConfig.of(ksqlConfig, executionOverrides))
val configured = injector.inject(preconfigured)

import io.confluent.ksql.parser.tree.Query
val query = configured.getStatement.asInstanceOf[Query]
assert(query.isPullQuery)