KsqlEngine¶
KsqlEngine is the ksqlDB execution engine in all the available execution modes:
- Embedded (for KsqlContext)
- Headless (for StandaloneExecutor)
- REST (for KsqlRestApplication)
KsqlEngine is a facade (frontend) of the EngineContext.
KsqlEngine is used to parse, validate (executed in a sandbox) and execute ksql statements (with starting persistent queries).
| Phase | Embedded | Headless | REST | 
|---|---|---|---|
| Parsing | KsqlContext | StandaloneExecutor | StatementParser | 
| Validation | KsqlContext | StandaloneExecutor | |
| Execution | KsqlContext | StandaloneExecutor | InteractiveStatementExecutor | 
Creating Instance¶
KsqlEngine takes the following to be created:
- ServiceContext
- ProcessingLogContext
- Service ID
- MutableMetaStore
- Function to create a KsqlEngineMetrics
-  QueryIdGenerator
- KsqlConfig
-  QueryEventListeners
KsqlEngine is created when:
- KsqlContextis requested to create
- KsqlRestApplicationis requested to buildApplication
- StandaloneExecutorFactoryis requested to create
ProcessingLogContext¶
KsqlEngine is given a ProcessingLogContext when created.
The ProcessingLogContext is used to create a primary context (EngineContext).
Service ID¶
KsqlEngine can be given a Service ID when created. Unless defined, the Service ID is from ServiceInfo based on ksql.service.id configuration property.
The service ID is used (via getServiceId getter) to create the following:
MutableMetaStore¶
KsqlEngine can be given a MutableMetaStore when created.
The MutableMetaStore is used to create the EngineContext.
EngineContext¶
KsqlEngine creates an EngineContext when created.
KsqlEngine delegates most of its processing to the EngineContext (directly or indirectly usingEngineExecutor) and is (pretty much) their facade.
KsqlEngineMetrics¶
KsqlEngine creates a KsqlEngineMetrics (using the engineMetricsFactory) when created.
The KsqlEngineMetrics is used for the following:
- Accessing the QueryStateMetricsReportingListener when this KsqlEngineis created
- Updating runtime metrics
Runtime Metrics¶
KsqlEngine creates a single-threaded Java Executor for updating performance metrics (using KsqlEngineMetrics) every 1 second.
KsqlEngine stops updating metrics when closed.
Executing Statement¶
ExecuteResult execute(
  ServiceContext serviceContext,
  ConfiguredStatement<?> statement) // (1)!
ExecuteResult execute(
  ServiceContext serviceContext,
  ConfiguredKsqlPlan plan)
- Plans the statement and creates a ConfiguredKsqlPlanfor the otherexecute
execute is part of the KsqlExecutionContext abstraction.
execute creates an EngineExecutor to execute the KsqlPlan (of the given ConfiguredKsqlPlan).
Statement Planning (plan)¶
KsqlPlan plan(
  ServiceContext serviceContext,
  ConfiguredStatement<?> statement)
plan is part of the KsqlExecutionContext abstraction.
plan creates an EngineExecutor to plan the given ConfiguredStatement.
getAllLiveQueries¶
List<QueryMetadata> getAllLiveQueries()
getAllLiveQueries is part of the KsqlExecutionContext abstraction.
getAllLiveQueries requests the EngineContext for the QueryRegistry that is then requested for all live queries.
Parsing SQL Statements¶
List<ParsedStatement> parse(
  String sql)
parse is part of the KsqlExecutionContext abstraction.
parse requests the EngineContext to parse the given SQL statements (into a collection of ParsedStatements).
Preparing Statement for Execution¶
PreparedStatement<?> prepare(
  ParsedStatement stmt,
  Map<String, String> variablesMap)
prepare is part of the KsqlExecutionContext abstraction.
prepare requests the EngineContext to prepare the given ParsedStatement.
Executing Scalable Push Query¶
ScalablePushQueryMetadata executeScalablePushQuery(
  ImmutableAnalysis analysis,
  ServiceContext serviceContext,
  ConfiguredStatement<Query> statement,
  PushRouting pushRouting,
  PushRoutingOptions pushRoutingOptions,
  QueryPlannerOptions queryPlannerOptions,
  Context context,
  Optional<ScalablePushQueryMetrics> scalablePushQueryMetrics)
executeScalablePushQuery is part of the KsqlExecutionContext abstraction.
executeScalablePushQuery creates an EngineExecutor to execute a scalable push query.
Executing Table Pull Query¶
PullQueryResult executeTablePullQuery(
  ImmutableAnalysis analysis,
  ServiceContext serviceContext,
  ConfiguredStatement<Query> statement,
  HARouting routing,
  RoutingOptions routingOptions,
  QueryPlannerOptions plannerOptions,
  Optional<PullQueryExecutorMetrics> pullQueryMetrics,
  boolean startImmediately,
  Optional<ConsistencyOffsetVector> consistencyOffsetVector)
executeTablePullQuery is part of the KsqlExecutionContext abstraction.
executeTablePullQuery...FIXME
isExecutableStatement¶
boolean isExecutableStatement(
  Statement statement)
isExecutableStatement is positive (true) when the given Statement is one of the following:
isExecutableStatement is used when:
- EngineExecutoris requested to plan a statement (and throwOnNonExecutableStatement)
- RequestValidatoris requested to validate a statement
Analyzing Query Statement With No Sink¶
ImmutableAnalysis analyzeQueryWithNoOutputTopic(
  Query query,
  String queryText,
  Map<String, Object> configOverrides)
analyzeQueryWithNoOutputTopic creates a QueryAnalyzer with the following:
analyzeQueryWithNoOutputTopic requests the QueryAnalyzer to analyze the given Query statement (and no Sink).
In the end, analyzeQueryWithNoOutputTopic creates a RewrittenAnalysis (with the Analysis and QueryExecutionUtil.ColumnReferenceRewriter)
analyzeQueryWithNoOutputTopic is used when:
- QueryExecutoris requested to handle pull or push queries
getRowpartitionRowoffsetEnabled¶
boolean getRowpartitionRowoffsetEnabled(
  KsqlConfig ksqlConfig,
  Map<String, Object> configOverrides)
getRowpartitionRowoffsetEnabled takes the value of ksql.rowpartition.rowoffset.enabled from configOverrides (if available) or the given KsqlConfig.
Executing Transient Query¶
TransientQueryMetadata executeTransientQuery(
  ServiceContext serviceContext,
  ConfiguredStatement<Query> statement,
  boolean excludeTombstones)
executeTransientQuery is part of the KsqlExecutionContext abstraction.
executeTransientQuery creates an EngineExecutor to executeTransientQuery.
Creating Stream Pull Query¶
StreamPullQueryMetadata createStreamPullQuery(
  ServiceContext serviceContext,
  ImmutableAnalysis analysis,
  ConfiguredStatement<Query> statementOrig,
  boolean excludeTombstones)
createStreamPullQuery uses ksql.query.pull.stream.enabled to ensure that pull queries on streams are enabled. If not, createStreamPullQuery throws a KsqlStatementException:
Pull queries on streams are disabled.
To create a push query on the stream, add EMIT CHANGES to the end.
To enable pull queries on streams, set the ksql.query.pull.stream.enabled config to 'true'.
createStreamPullQuery...FIXME
createStreamPullQuery creates an EngineExecutor to execute the stream pull query.
createStreamPullQuery...FIXME
In the end, createStreamPullQuery returns a StreamPullQueryMetadata with the TransientQueryMetadata and the endOffsets.
createStreamPullQuery is used when:
- QueryExecutoris requested to handle a stream pull query
updateStreamsPropertiesAndRestartRuntime¶
void updateStreamsPropertiesAndRestartRuntime()
updateStreamsPropertiesAndRestartRuntime is part of the KsqlExecutionContext abstraction.
updateStreamsPropertiesAndRestartRuntime...FIXME
getMetaStore¶
MetaStore getMetaStore()
getMetaStore is part of the KsqlExecutionContext abstraction.
getMetaStore requests the EngineContext for the MetaStore
getPersistentQueries¶
List<PersistentQueryMetadata> getPersistentQueries()
getPersistentQueries is part of the KsqlExecutionContext abstraction.
getPersistentQueries requests the EngineContext for the QueryRegistry for the persistent queries.
Demo: Creating KsqlEngine¶
import io.confluent.ksql.util.KsqlConfig
val ksqlConfig = KsqlConfig.empty
import io.confluent.ksql.services._
val serviceContext = ServiceContextFactory.create(ksqlConfig, () => DisabledKsqlClient.instance)
import io.confluent.ksql.logging.processing.ProcessingLogContext
val processingLogContext = ProcessingLogContext.create()
import io.confluent.ksql.function.InternalFunctionRegistry
val functionRegistry = new InternalFunctionRegistry()
import io.confluent.ksql.ServiceInfo
val serviceInfo = ServiceInfo.create(ksqlConfig)
import io.confluent.ksql.query.id.SequentialQueryIdGenerator
val queryIdGenerator = new SequentialQueryIdGenerator()
import io.confluent.ksql.engine.QueryEventListener
import scala.jdk.CollectionConverters._
val queryEventListeners = Seq.empty[QueryEventListener].asJava
import io.confluent.ksql.metrics.MetricCollectors
val metricCollectors = new MetricCollectors()
import io.confluent.ksql.engine.KsqlEngine
val ksqlEngine = new KsqlEngine(
  serviceContext,
  processingLogContext,
  functionRegistry,
  serviceInfo,
  queryIdGenerator,
  ksqlConfig,
  queryEventListeners,
  metricCollectors)
Logging¶
Enable ALL logging level for io.confluent.ksql.engine.KsqlEngine logger to see what happens inside.
Add the following line to log4j.properties:
log4j.logger.io.confluent.ksql.engine.KsqlEngine=ALL
Refer to Logging.