Skip to content


KsqlEngine is the ksqlDB execution engine in all the available execution modes:

KsqlEngine is a facade (frontend) of the EngineContext.

KsqlEngine is used to parse, validate (executed in a sandbox) and execute ksql statements (with starting persistent queries).

Phase Embedded Headless REST
Parsing KsqlContext StandaloneExecutor StatementParser
Validation KsqlContext StandaloneExecutor
Execution KsqlContext StandaloneExecutor InteractiveStatementExecutor

Creating Instance

KsqlEngine takes the following to be created:

KsqlEngine is created when:

Service ID

KsqlEngine can be given a Service ID when created. Unless defined, the Service ID is from ServiceInfo based on configuration property.

The service ID is used (via getServiceId getter) to create the following:


KsqlEngine can be given a MutableMetaStore when created.

The MutableMetaStore is used to create the EngineContext.


KsqlEngine creates an EngineContext when created.

KsqlEngine delegates most of its processing to the EngineContext (directly or indirectly usingEngineExecutor) and is (pretty much) their facade.


KsqlEngine creates a KsqlEngineMetrics (using the engineMetricsFactory) when created.

The KsqlEngineMetrics is used for the following:

Runtime Metrics

KsqlEngine creates a single-threaded Java Executor for updating performance metrics (using KsqlEngineMetrics) every 1 second.

KsqlEngine stops updating metrics when closed.

Executing Statement

ExecuteResult execute(
  ServiceContext serviceContext,
  ConfiguredStatement<?> statement) // (1)!
ExecuteResult execute(
  ServiceContext serviceContext,
  ConfiguredKsqlPlan plan)
  1. Plans the statement and creates a ConfiguredKsqlPlan for the other execute

execute is part of the KsqlExecutionContext abstraction.

execute creates an EngineExecutor to execute the KsqlPlan (of the given ConfiguredKsqlPlan).

Statement Planning (plan)

KsqlPlan plan(
  ServiceContext serviceContext,
  ConfiguredStatement<?> statement)

plan is part of the KsqlExecutionContext abstraction.

plan creates an EngineExecutor to plan the given ConfiguredStatement.


List<QueryMetadata> getAllLiveQueries()

getAllLiveQueries is part of the KsqlExecutionContext abstraction.

getAllLiveQueries requests the EngineContext for the QueryRegistry that is then requested for all live queries.

Parsing SQL Statements

List<ParsedStatement> parse(
  String sql)

parse is part of the KsqlExecutionContext abstraction.

parse requests the EngineContext to parse the given SQL statements (into a collection of ParsedStatements).

Preparing Statement for Execution

PreparedStatement<?> prepare(
  ParsedStatement stmt,
  Map<String, String> variablesMap)

prepare is part of the KsqlExecutionContext abstraction.

prepare requests the EngineContext to prepare the given ParsedStatement.

Executing Scalable Push Query

ScalablePushQueryMetadata executeScalablePushQuery(
  ImmutableAnalysis analysis,
  ServiceContext serviceContext,
  ConfiguredStatement<Query> statement,
  PushRouting pushRouting,
  PushRoutingOptions pushRoutingOptions,
  QueryPlannerOptions queryPlannerOptions,
  Context context,
  Optional<ScalablePushQueryMetrics> scalablePushQueryMetrics)

executeScalablePushQuery is part of the KsqlExecutionContext abstraction.

executeScalablePushQuery creates an EngineExecutor to execute a scalable push query.

Executing Table Pull Query

PullQueryResult executeTablePullQuery(
  ImmutableAnalysis analysis,
  ServiceContext serviceContext,
  ConfiguredStatement<Query> statement,
  HARouting routing,
  RoutingOptions routingOptions,
  QueryPlannerOptions plannerOptions,
  Optional<PullQueryExecutorMetrics> pullQueryMetrics,
  boolean startImmediately,
  Optional<ConsistencyOffsetVector> consistencyOffsetVector)

executeTablePullQuery is part of the KsqlExecutionContext abstraction.



boolean isExecutableStatement(
  Statement statement)

isExecutableStatement is positive (true) when the given Statement is one of the following:

isExecutableStatement is used when:

Analyzing Query Statement With No Sink

ImmutableAnalysis analyzeQueryWithNoOutputTopic(
  Query query,
  String queryText,
  Map<String, Object> configOverrides)

analyzeQueryWithNoOutputTopic creates a QueryAnalyzer with the following:

analyzeQueryWithNoOutputTopic requests the QueryAnalyzer to analyze the given Query statement (and no Sink).

In the end, analyzeQueryWithNoOutputTopic creates a RewrittenAnalysis (with the Analysis and QueryExecutionUtil.ColumnReferenceRewriter)

analyzeQueryWithNoOutputTopic is used when:


boolean getRowpartitionRowoffsetEnabled(
  KsqlConfig ksqlConfig,
  Map<String, Object> configOverrides)

getRowpartitionRowoffsetEnabled takes the value of ksql.rowpartition.rowoffset.enabled from configOverrides (if available) or the given KsqlConfig.

Executing Transient Query

TransientQueryMetadata executeTransientQuery(
  ServiceContext serviceContext,
  ConfiguredStatement<Query> statement,
  boolean excludeTombstones)

executeTransientQuery is part of the KsqlExecutionContext abstraction.

executeTransientQuery creates an EngineExecutor to executeTransientQuery.

Creating Stream Pull Query

StreamPullQueryMetadata createStreamPullQuery(
  ServiceContext serviceContext,
  ImmutableAnalysis analysis,
  ConfiguredStatement<Query> statementOrig,
  boolean excludeTombstones)

createStreamPullQuery uses to ensure that pull queries on streams are enabled. If not, createStreamPullQuery throws a KsqlStatementException:

Pull queries on streams are disabled.
To create a push query on the stream, add EMIT CHANGES to the end.
To enable pull queries on streams, set the config to 'true'.


createStreamPullQuery creates an EngineExecutor to execute the stream pull query.


In the end, createStreamPullQuery returns a StreamPullQueryMetadata with the TransientQueryMetadata and the endOffsets.

createStreamPullQuery is used when:


void updateStreamsPropertiesAndRestartRuntime()

updateStreamsPropertiesAndRestartRuntime is part of the KsqlExecutionContext abstraction.



MetaStore getMetaStore()

getMetaStore is part of the KsqlExecutionContext abstraction.

getMetaStore requests the EngineContext for the MetaStore


List<PersistentQueryMetadata> getPersistentQueries()

getPersistentQueries is part of the KsqlExecutionContext abstraction.

getPersistentQueries requests the EngineContext for the QueryRegistry for the persistent queries.

Demo: Creating KsqlEngine

import io.confluent.ksql.util.KsqlConfig
val ksqlConfig = KsqlConfig.empty

val serviceContext = ServiceContextFactory.create(ksqlConfig, () => DisabledKsqlClient.instance)

import io.confluent.ksql.logging.processing.ProcessingLogContext
val processingLogContext = ProcessingLogContext.create()

import io.confluent.ksql.function.InternalFunctionRegistry
val functionRegistry = new InternalFunctionRegistry()

import io.confluent.ksql.ServiceInfo
val serviceInfo = ServiceInfo.create(ksqlConfig)

val queryIdGenerator = new SequentialQueryIdGenerator()

import io.confluent.ksql.engine.QueryEventListener
import scala.jdk.CollectionConverters._
val queryEventListeners = Seq.empty[QueryEventListener].asJava

import io.confluent.ksql.metrics.MetricCollectors
val metricCollectors = new MetricCollectors()
import io.confluent.ksql.engine.KsqlEngine
val ksqlEngine = new KsqlEngine(


Enable ALL logging level for io.confluent.ksql.engine.KsqlEngine logger to see what happens inside.

Add the following line to

Refer to Logging.