Skip to content

KsqlEngine

KsqlEngine is the ksqlDB execution engine in all the available execution modes:

KsqlEngine is a facade (frontend) of the EngineContext.

KsqlEngine is used to parse, validate (executed in a sandbox) and execute ksql statements (with starting persistent queries).

Phase Embedded Headless REST
Parsing KsqlContext StandaloneExecutor StatementParser
Validation KsqlContext StandaloneExecutor
Execution KsqlContext StandaloneExecutor InteractiveStatementExecutor

Creating Instance

KsqlEngine takes the following to be created:

KsqlEngine is created when:

ProcessingLogContext

KsqlEngine is given a ProcessingLogContext when created.

The ProcessingLogContext is used to create a primary context (EngineContext).

Service ID

KsqlEngine can be given a Service ID when created. Unless defined, the Service ID is from ServiceInfo based on ksql.service.id configuration property.

The service ID is used (via getServiceId getter) to create the following:

MutableMetaStore

KsqlEngine can be given a MutableMetaStore when created.

The MutableMetaStore is used to create the EngineContext.

EngineContext

KsqlEngine creates an EngineContext when created.

KsqlEngine delegates most of its processing to the EngineContext (directly or indirectly usingEngineExecutor) and is (pretty much) their facade.

KsqlEngineMetrics

KsqlEngine creates a KsqlEngineMetrics (using the engineMetricsFactory) when created.

The KsqlEngineMetrics is used for the following:

Runtime Metrics

KsqlEngine creates a single-threaded Java Executor for updating performance metrics (using KsqlEngineMetrics) every 1 second.

KsqlEngine stops updating metrics when closed.

Executing Statement

ExecuteResult execute(
  ServiceContext serviceContext,
  ConfiguredStatement<?> statement) // (1)!
ExecuteResult execute(
  ServiceContext serviceContext,
  ConfiguredKsqlPlan plan)
  1. Plans the statement and creates a ConfiguredKsqlPlan for the other execute

execute is part of the KsqlExecutionContext abstraction.


execute creates an EngineExecutor to execute the KsqlPlan (of the given ConfiguredKsqlPlan).

Statement Planning (plan)

KsqlPlan plan(
  ServiceContext serviceContext,
  ConfiguredStatement<?> statement)

plan is part of the KsqlExecutionContext abstraction.


plan creates an EngineExecutor to plan the given ConfiguredStatement.

getAllLiveQueries

List<QueryMetadata> getAllLiveQueries()

getAllLiveQueries is part of the KsqlExecutionContext abstraction.


getAllLiveQueries requests the EngineContext for the QueryRegistry that is then requested for all live queries.

Parsing SQL Statements

List<ParsedStatement> parse(
  String sql)

parse is part of the KsqlExecutionContext abstraction.


parse requests the EngineContext to parse the given SQL statements (into a collection of ParsedStatements).

Preparing Statement for Execution

PreparedStatement<?> prepare(
  ParsedStatement stmt,
  Map<String, String> variablesMap)

prepare is part of the KsqlExecutionContext abstraction.


prepare requests the EngineContext to prepare the given ParsedStatement.

Executing Scalable Push Query

ScalablePushQueryMetadata executeScalablePushQuery(
  ImmutableAnalysis analysis,
  ServiceContext serviceContext,
  ConfiguredStatement<Query> statement,
  PushRouting pushRouting,
  PushRoutingOptions pushRoutingOptions,
  QueryPlannerOptions queryPlannerOptions,
  Context context,
  Optional<ScalablePushQueryMetrics> scalablePushQueryMetrics)

executeScalablePushQuery is part of the KsqlExecutionContext abstraction.


executeScalablePushQuery creates an EngineExecutor to execute a scalable push query.

Executing Table Pull Query

PullQueryResult executeTablePullQuery(
  ImmutableAnalysis analysis,
  ServiceContext serviceContext,
  ConfiguredStatement<Query> statement,
  HARouting routing,
  RoutingOptions routingOptions,
  QueryPlannerOptions plannerOptions,
  Optional<PullQueryExecutorMetrics> pullQueryMetrics,
  boolean startImmediately,
  Optional<ConsistencyOffsetVector> consistencyOffsetVector)

executeTablePullQuery is part of the KsqlExecutionContext abstraction.


executeTablePullQuery...FIXME

isExecutableStatement

boolean isExecutableStatement(
  Statement statement)

isExecutableStatement is positive (true) when the given Statement is one of the following:


isExecutableStatement is used when:

Analyzing Query Statement With No Sink

ImmutableAnalysis analyzeQueryWithNoOutputTopic(
  Query query,
  String queryText,
  Map<String, Object> configOverrides)

analyzeQueryWithNoOutputTopic creates a QueryAnalyzer with the following:

analyzeQueryWithNoOutputTopic requests the QueryAnalyzer to analyze the given Query statement (and no Sink).

In the end, analyzeQueryWithNoOutputTopic creates a RewrittenAnalysis (with the Analysis and QueryExecutionUtil.ColumnReferenceRewriter)


analyzeQueryWithNoOutputTopic is used when:

getRowpartitionRowoffsetEnabled

boolean getRowpartitionRowoffsetEnabled(
  KsqlConfig ksqlConfig,
  Map<String, Object> configOverrides)

getRowpartitionRowoffsetEnabled takes the value of ksql.rowpartition.rowoffset.enabled from configOverrides (if available) or the given KsqlConfig.

Executing Transient Query

TransientQueryMetadata executeTransientQuery(
  ServiceContext serviceContext,
  ConfiguredStatement<Query> statement,
  boolean excludeTombstones)

executeTransientQuery is part of the KsqlExecutionContext abstraction.


executeTransientQuery creates an EngineExecutor to executeTransientQuery.

Creating Stream Pull Query

StreamPullQueryMetadata createStreamPullQuery(
  ServiceContext serviceContext,
  ImmutableAnalysis analysis,
  ConfiguredStatement<Query> statementOrig,
  boolean excludeTombstones)

createStreamPullQuery uses ksql.query.pull.stream.enabled to ensure that pull queries on streams are enabled. If not, createStreamPullQuery throws a KsqlStatementException:

Pull queries on streams are disabled.
To create a push query on the stream, add EMIT CHANGES to the end.
To enable pull queries on streams, set the ksql.query.pull.stream.enabled config to 'true'.

createStreamPullQuery...FIXME

createStreamPullQuery creates an EngineExecutor to execute the stream pull query.

createStreamPullQuery...FIXME

In the end, createStreamPullQuery returns a StreamPullQueryMetadata with the TransientQueryMetadata and the endOffsets.


createStreamPullQuery is used when:

updateStreamsPropertiesAndRestartRuntime

void updateStreamsPropertiesAndRestartRuntime()

updateStreamsPropertiesAndRestartRuntime is part of the KsqlExecutionContext abstraction.


updateStreamsPropertiesAndRestartRuntime...FIXME

getMetaStore

MetaStore getMetaStore()

getMetaStore is part of the KsqlExecutionContext abstraction.


getMetaStore requests the EngineContext for the MetaStore

getPersistentQueries

List<PersistentQueryMetadata> getPersistentQueries()

getPersistentQueries is part of the KsqlExecutionContext abstraction.


getPersistentQueries requests the EngineContext for the QueryRegistry for the persistent queries.

Demo: Creating KsqlEngine

import io.confluent.ksql.util.KsqlConfig
val ksqlConfig = KsqlConfig.empty

import io.confluent.ksql.services._
val serviceContext = ServiceContextFactory.create(ksqlConfig, () => DisabledKsqlClient.instance)

import io.confluent.ksql.logging.processing.ProcessingLogContext
val processingLogContext = ProcessingLogContext.create()

import io.confluent.ksql.function.InternalFunctionRegistry
val functionRegistry = new InternalFunctionRegistry()

import io.confluent.ksql.ServiceInfo
val serviceInfo = ServiceInfo.create(ksqlConfig)

import io.confluent.ksql.query.id.SequentialQueryIdGenerator
val queryIdGenerator = new SequentialQueryIdGenerator()

import io.confluent.ksql.engine.QueryEventListener
import scala.jdk.CollectionConverters._
val queryEventListeners = Seq.empty[QueryEventListener].asJava

import io.confluent.ksql.metrics.MetricCollectors
val metricCollectors = new MetricCollectors()
import io.confluent.ksql.engine.KsqlEngine
val ksqlEngine = new KsqlEngine(
  serviceContext,
  processingLogContext,
  functionRegistry,
  serviceInfo,
  queryIdGenerator,
  ksqlConfig,
  queryEventListeners,
  metricCollectors)

Logging

Enable ALL logging level for io.confluent.ksql.engine.KsqlEngine logger to see what happens inside.

Add the following line to log4j.properties:

log4j.logger.io.confluent.ksql.engine.KsqlEngine=ALL

Refer to Logging.