KsqlEngine¶
KsqlEngine
is the ksqlDB execution engine in all the available execution modes:
- Embedded (for KsqlContext)
- Headless (for StandaloneExecutor)
- REST (for KsqlRestApplication)
KsqlEngine
is a facade (frontend) of the EngineContext.
KsqlEngine
is used to parse, validate (executed in a sandbox) and execute ksql statements (with starting persistent queries).
Phase | Embedded | Headless | REST |
---|---|---|---|
Parsing | KsqlContext | StandaloneExecutor | StatementParser |
Validation | KsqlContext | StandaloneExecutor | |
Execution | KsqlContext | StandaloneExecutor | InteractiveStatementExecutor |
Creating Instance¶
KsqlEngine
takes the following to be created:
- ServiceContext
- ProcessingLogContext
- Service ID
- MutableMetaStore
- Function to create a KsqlEngineMetrics
-
QueryIdGenerator
- KsqlConfig
-
QueryEventListener
s
KsqlEngine
is created when:
KsqlContext
is requested to createKsqlRestApplication
is requested to buildApplicationStandaloneExecutorFactory
is requested to create
ProcessingLogContext¶
KsqlEngine
is given a ProcessingLogContext when created.
The ProcessingLogContext
is used to create a primary context (EngineContext).
Service ID¶
KsqlEngine
can be given a Service ID when created. Unless defined, the Service ID is from ServiceInfo based on ksql.service.id configuration property.
The service ID is used (via getServiceId
getter) to create the following:
MutableMetaStore¶
KsqlEngine
can be given a MutableMetaStore when created.
The MutableMetaStore
is used to create the EngineContext.
EngineContext¶
KsqlEngine
creates an EngineContext when created.
KsqlEngine
delegates most of its processing to the EngineContext
(directly or indirectly usingEngineExecutor) and is (pretty much) their facade.
KsqlEngineMetrics¶
KsqlEngine
creates a KsqlEngineMetrics (using the engineMetricsFactory) when created.
The KsqlEngineMetrics
is used for the following:
- Accessing the QueryStateMetricsReportingListener when this
KsqlEngine
is created - Updating runtime metrics
Runtime Metrics¶
KsqlEngine
creates a single-threaded Java Executor
for updating performance metrics (using KsqlEngineMetrics) every 1 second.
KsqlEngine
stops updating metrics when closed.
Executing Statement¶
ExecuteResult execute(
ServiceContext serviceContext,
ConfiguredStatement<?> statement) // (1)!
ExecuteResult execute(
ServiceContext serviceContext,
ConfiguredKsqlPlan plan)
- Plans the statement and creates a
ConfiguredKsqlPlan
for the otherexecute
execute
is part of the KsqlExecutionContext abstraction.
execute
creates an EngineExecutor to execute the KsqlPlan (of the given ConfiguredKsqlPlan
).
Statement Planning (plan)¶
KsqlPlan plan(
ServiceContext serviceContext,
ConfiguredStatement<?> statement)
plan
is part of the KsqlExecutionContext abstraction.
plan
creates an EngineExecutor to plan the given ConfiguredStatement
.
getAllLiveQueries¶
List<QueryMetadata> getAllLiveQueries()
getAllLiveQueries
is part of the KsqlExecutionContext abstraction.
getAllLiveQueries
requests the EngineContext for the QueryRegistry that is then requested for all live queries.
Parsing SQL Statements¶
List<ParsedStatement> parse(
String sql)
parse
is part of the KsqlExecutionContext abstraction.
parse
requests the EngineContext to parse the given SQL statements (into a collection of ParsedStatement
s).
Preparing Statement for Execution¶
PreparedStatement<?> prepare(
ParsedStatement stmt,
Map<String, String> variablesMap)
prepare
is part of the KsqlExecutionContext abstraction.
prepare
requests the EngineContext to prepare the given ParsedStatement.
Executing Scalable Push Query¶
ScalablePushQueryMetadata executeScalablePushQuery(
ImmutableAnalysis analysis,
ServiceContext serviceContext,
ConfiguredStatement<Query> statement,
PushRouting pushRouting,
PushRoutingOptions pushRoutingOptions,
QueryPlannerOptions queryPlannerOptions,
Context context,
Optional<ScalablePushQueryMetrics> scalablePushQueryMetrics)
executeScalablePushQuery
is part of the KsqlExecutionContext abstraction.
executeScalablePushQuery
creates an EngineExecutor to execute a scalable push query.
Executing Table Pull Query¶
PullQueryResult executeTablePullQuery(
ImmutableAnalysis analysis,
ServiceContext serviceContext,
ConfiguredStatement<Query> statement,
HARouting routing,
RoutingOptions routingOptions,
QueryPlannerOptions plannerOptions,
Optional<PullQueryExecutorMetrics> pullQueryMetrics,
boolean startImmediately,
Optional<ConsistencyOffsetVector> consistencyOffsetVector)
executeTablePullQuery
is part of the KsqlExecutionContext abstraction.
executeTablePullQuery
...FIXME
isExecutableStatement¶
boolean isExecutableStatement(
Statement statement)
isExecutableStatement
is positive (true
) when the given Statement is one of the following:
isExecutableStatement
is used when:
EngineExecutor
is requested to plan a statement (and throwOnNonExecutableStatement)RequestValidator
is requested to validate a statement
Analyzing Query Statement With No Sink¶
ImmutableAnalysis analyzeQueryWithNoOutputTopic(
Query query,
String queryText,
Map<String, Object> configOverrides)
analyzeQueryWithNoOutputTopic
creates a QueryAnalyzer with the following:
analyzeQueryWithNoOutputTopic
requests the QueryAnalyzer
to analyze the given Query statement (and no Sink).
In the end, analyzeQueryWithNoOutputTopic
creates a RewrittenAnalysis (with the Analysis and QueryExecutionUtil.ColumnReferenceRewriter
)
analyzeQueryWithNoOutputTopic
is used when:
QueryExecutor
is requested to handle pull or push queries
getRowpartitionRowoffsetEnabled¶
boolean getRowpartitionRowoffsetEnabled(
KsqlConfig ksqlConfig,
Map<String, Object> configOverrides)
getRowpartitionRowoffsetEnabled
takes the value of ksql.rowpartition.rowoffset.enabled from configOverrides
(if available) or the given KsqlConfig.
Executing Transient Query¶
TransientQueryMetadata executeTransientQuery(
ServiceContext serviceContext,
ConfiguredStatement<Query> statement,
boolean excludeTombstones)
executeTransientQuery
is part of the KsqlExecutionContext abstraction.
executeTransientQuery
creates an EngineExecutor to executeTransientQuery.
Creating Stream Pull Query¶
StreamPullQueryMetadata createStreamPullQuery(
ServiceContext serviceContext,
ImmutableAnalysis analysis,
ConfiguredStatement<Query> statementOrig,
boolean excludeTombstones)
createStreamPullQuery
uses ksql.query.pull.stream.enabled to ensure that pull queries on streams are enabled. If not, createStreamPullQuery
throws a KsqlStatementException
:
Pull queries on streams are disabled.
To create a push query on the stream, add EMIT CHANGES to the end.
To enable pull queries on streams, set the ksql.query.pull.stream.enabled config to 'true'.
createStreamPullQuery
...FIXME
createStreamPullQuery
creates an EngineExecutor to execute the stream pull query.
createStreamPullQuery
...FIXME
In the end, createStreamPullQuery
returns a StreamPullQueryMetadata
with the TransientQueryMetadata
and the endOffsets
.
createStreamPullQuery
is used when:
QueryExecutor
is requested to handle a stream pull query
updateStreamsPropertiesAndRestartRuntime¶
void updateStreamsPropertiesAndRestartRuntime()
updateStreamsPropertiesAndRestartRuntime
is part of the KsqlExecutionContext abstraction.
updateStreamsPropertiesAndRestartRuntime
...FIXME
getMetaStore¶
MetaStore getMetaStore()
getMetaStore
is part of the KsqlExecutionContext abstraction.
getMetaStore
requests the EngineContext for the MetaStore
getPersistentQueries¶
List<PersistentQueryMetadata> getPersistentQueries()
getPersistentQueries
is part of the KsqlExecutionContext abstraction.
getPersistentQueries
requests the EngineContext for the QueryRegistry for the persistent queries.
Demo: Creating KsqlEngine¶
import io.confluent.ksql.util.KsqlConfig
val ksqlConfig = KsqlConfig.empty
import io.confluent.ksql.services._
val serviceContext = ServiceContextFactory.create(ksqlConfig, () => DisabledKsqlClient.instance)
import io.confluent.ksql.logging.processing.ProcessingLogContext
val processingLogContext = ProcessingLogContext.create()
import io.confluent.ksql.function.InternalFunctionRegistry
val functionRegistry = new InternalFunctionRegistry()
import io.confluent.ksql.ServiceInfo
val serviceInfo = ServiceInfo.create(ksqlConfig)
import io.confluent.ksql.query.id.SequentialQueryIdGenerator
val queryIdGenerator = new SequentialQueryIdGenerator()
import io.confluent.ksql.engine.QueryEventListener
import scala.jdk.CollectionConverters._
val queryEventListeners = Seq.empty[QueryEventListener].asJava
import io.confluent.ksql.metrics.MetricCollectors
val metricCollectors = new MetricCollectors()
import io.confluent.ksql.engine.KsqlEngine
val ksqlEngine = new KsqlEngine(
serviceContext,
processingLogContext,
functionRegistry,
serviceInfo,
queryIdGenerator,
ksqlConfig,
queryEventListeners,
metricCollectors)
Logging¶
Enable ALL
logging level for io.confluent.ksql.engine.KsqlEngine
logger to see what happens inside.
Add the following line to log4j.properties
:
log4j.logger.io.confluent.ksql.engine.KsqlEngine=ALL
Refer to Logging.