EngineExecutor¶
Creating Instance¶
EngineExecutor
takes the following to be created:
EngineExecutor
is created using create factory.
Creating EngineExecutor¶
EngineExecutor create(
EngineContext engineContext,
ServiceContext serviceContext,
SessionConfig config)
create
creates an EngineExecutor.
create
is simply a convenient static factory method that does nothing but new EngineExecutor
and that little programming trick makes for a more readable fluent client code.
EngineExecutor
.create(...)
.plan(statement)
create
is used when:
KsqlEngine
is requested to plan, execute, executeTransientQuery, createStreamPullQuery, executeScalablePushQuery, executeTablePullQuerySandboxedExecutionContext
is requested to plan, execute, executeTransientQuery, executeTablePullQueryQuery, executeScalablePushQuery
Planning Query for Execution¶
ExecutorPlans planQuery(
ConfiguredStatement<?> statement,
Query query,
Optional<Sink> sink,
Optional<String> withQueryId,
MetaStore metaStore)
In summary, planQuery
takes a Query statement and creates an ExecutorPlans.
The optional Sink and query ID are only given when EngineExecutor
is requested to plan a QueryContainer. Otherwise, they are both undefined.
planQuery
is used when:
EngineExecutor
is requested to execute transient and stream pull queries, and to plan a statement (and sourceTablePlan)
Step 1. Creating QueryEngine¶
planQuery
requests the EngineContext to create a QueryEngine (for the ServiceContext).
Note
planQuery
creates a QueryEngine every time it is executed.
Step 2. Building Logical Plan¶
planQuery
builds a logical query plan (an OutputNode).
planQuery
creates a LogicalPlanNode
(for the query statement text and the OutputNode
), a QueryId
and looks up a PersistentQueryMetadata (in the QueryRegistry) for the QueryId
(if one exists).
Step 3. Building Physical Plan¶
planQuery
builds a physical query plan (a PhysicalPlan).
Step 4. Creating ExecutorPlans¶
In the end, planQuery
creates an ExecutorPlans
(with the LogicalPlanNode
and the PhysicalPlan).
KSQL Statement Planning¶
KsqlPlan plan(
ConfiguredStatement<?> statement)
plan
makes sure that the Statement (of the given ConfiguredStatement
) is executable and throws a KsqlStatementException if not.
plan
branches off based on the type of the statement:
plan
is used when:
KsqlEngine
is requested to plan a statementSandboxedExecutionContext
is requested to plan a statement
ExecutableDdlStatement¶
For an ExecutableDdlStatement, plan
determines whether it is a CreateStream or a CreateTable (possibly SOURCEs).
For a source CreateTable
, plan
sourceTablePlan.
Otherwise, plan
requests the EngineContext to create a DdlCommand and then plans it.
QueryContainer¶
Otherwise, plan
assumes that the Statement
is a QueryContainer and plans the query (with the Sink
among the others that gives a PhysicalPlan
).
plan
maybeCreateSinkDdl.
plan
creates a QueryPlan.
In the end, plan
creates a KsqlPlanV1.
Exceptions¶
plan
throws a KsqlStatementException for a non-executable statement.
plan
throws a KsqlStatementException
for a CreateStream or a CreateTable
that are source the ksql.source.table.materialization.enabled configuration property is disabled:
Cannot execute command because source table materialization is disabled.
maybeCreateSinkDdl¶
Optional<DdlCommand> maybeCreateSinkDdl(
ConfiguredStatement<?> cfgStatement,
KsqlStructuredDataOutputNode outputNode)
maybeCreateSinkDdl
returns an empty value for a KsqlStructuredDataOutputNode with no createInto. maybeCreateSinkDdl
validateExistingSink.
Otherwise, maybeCreateSinkDdl
requests the EngineContext to createDdlCommand for the given KsqlStructuredDataOutputNode.
Planning Source Table Query¶
KsqlPlan sourceTablePlan(
ConfiguredStatement<?> statement)
sourceTablePlan
assumes that the given ConfiguredStatement
is for a CreateTable.
sourceTablePlan
requests the EngineContext to create a CreateTableCommand.
sourceTablePlan
creates a Query with the following:
AliasedRelation
for the source tableSelect
RefinementInfo
withCHANGES
output refinement- pullQuery flag off
sourceTablePlan
registers a KsqlTable
in a temporary MetaStoreImpl (so any future table name resolution will work).
sourceTablePlan
plans the query statement (with the given ConfiguredStatement
, the Query
, no sink and the temporary MetaStoreImpl
). It will produce an ExecutorPlans
.
sourceTablePlan
uses the ExecutorPlans
to access a KsqlBareOutputNode.
sourceTablePlan
creates a QueryPlan with the following:
- getSourceNames of the KsqlBareOutputNode
- No sink
- Other params from the
ExecutorPlans
sourceTablePlan
requests the EngineContext for the QueryValidator to validateQuery.
In the end, sourceTablePlan
creates a KsqlPlan (a KsqlPlanV1).
Executing KsqlPlan¶
ExecuteResult execute(
KsqlPlan plan)
execute
is made up of different "execution paths" to handle different variants of KsqlPlans, but mainly whether a DdlCommand is available or not
execute
throws an IllegalStateException
when the given KsqlPlan has neither physical query plan nor DDL command:
DdlResult should be present if there is no physical plan.
execute
is used when:
KsqlEngine
is requested to execute a querySandboxedExecutionContext
is requested to execute a query
DDL Command¶
With a KsqlPlan with no physical query plan, execute
executes the DDL command (of the KsqlPlan with the withQuery
flag off) and returns an ExecuteResult
.
QueryPlan¶
Physical Plan with optional DdlCommand
The given KsqlPlan may have a physical plan with or without a DdlCommand.
For a KsqlPlan with a physical query plan, execute
executes a DDL command (if available) and then the persistent query.
Executing Persistent Query¶
PersistentQueryMetadata executePersistentQuery(
QueryPlan queryPlan,
String statementText,
KsqlConstants.PersistentQueryType persistentQueryType)
executePersistentQuery
requests the EngineContext for the QueryRegistry to create or replace a persistent query (for the given QueryPlan and PersistentQueryType
).
Executing Scalable Push Query¶
Signature
ScalablePushQueryMetadata executeScalablePushQuery(
ImmutableAnalysis analysis,
ConfiguredStatement<Query> statement,
PushRouting pushRouting,
PushRoutingOptions pushRoutingOptions,
QueryPlannerOptions queryPlannerOptions,
Context context,
Optional<ScalablePushQueryMetrics> scalablePushQueryMetrics)
executeScalablePushQuery
builds a logical plan of the query (with isScalablePush
flag enabled).
In the end, executeScalablePushQuery
creates a ScalablePushQueryMetadata
(with a new TransientQueryQueue
).
Usage
executeScalablePushQuery
is used when:
KsqlEngine
is requested to execute a scalable push querySandboxedExecutionContext
is requested to execute a scalable push query
Executing Stream Pull Query¶
Signature
TransientQueryMetadata executeStreamPullQuery(
ConfiguredStatement<Query> statement,
boolean excludeTombstones,
ImmutableMap<TopicPartition, Long> endOffsets)
executeStreamPullQuery
plans the query (with an empty sink and query ID).
executeStreamPullQuery
requests the EngineContext to create a QueryValidator to validateQuery (with the SessionConfig and the ExecutionPlan
with ExecutionSteps).
In the end, executeStreamPullQuery
requests the EngineContext for the QueryRegistry to create a stream pull query.
Executing Transient Query¶
Signature
TransientQueryMetadata executeTransientQuery(
ConfiguredStatement<Query> statement,
boolean excludeTombstones)
executeTransientQuery
...FIXME
In the end, executeTransientQuery
requests the EngineContext for the QueryRegistry to create a transient query.
Usage
executeTransientQuery
is used when:
KsqlEngine
is requested to execute a transient querySandboxedExecutionContext
is requested to execute a transient query
Executing Table Pull Query¶
PullQueryResult executeTablePullQuery(
ImmutableAnalysis analysis,
ConfiguredStatement<Query> statement,
HARouting routing,
RoutingOptions routingOptions,
QueryPlannerOptions queryPlannerOptions,
Optional<PullQueryExecutorMetrics> pullQueryMetrics,
boolean startImmediately,
Optional<ConsistencyOffsetVector> consistencyOffsetVector)
executeTablePullQuery
buildAndValidateLogicalPlan followed by buildPullPhysicalPlan.
executeTablePullQuery
creates a PullQueryQueue
and a PullQueryQueuePopulator
(to requests the given HARouting
to handlePullQuery
).
executeTablePullQuery
creates a PullQueryResult
(and starts it when the given startImmediately
flag is enabled).
executeTablePullQuery
is used when:
KsqlEngine
is requested to execute a table pull querySandboxedExecutionContext
is requested to execute a table pull query
Building Query Logical Plan¶
LogicalPlanNode buildAndValidateLogicalPlan(
ConfiguredStatement<?> statement,
ImmutableAnalysis analysis,
KsqlConfig config,
QueryPlannerOptions queryPlannerOptions,
boolean isScalablePush)
buildAndValidateLogicalPlan
is given a statement (along with the analysis) and the isScalablePush
flag as follows:
true
when executing a scalable push queryfalse
when executing a table pull query
Naming
Since buildAndValidateLogicalPlan
is to execute buildQueryLogicalPlan it'd make sense to call it alike, wouldn't it?
After all, buildAndValidateLogicalPlan
is a facade to LogicalPlanner.
buildAndValidateLogicalPlan
creates a LogicalPlanner to build a logical plan of a query (that gives an OutputNode).
In the end, buildAndValidateLogicalPlan
creates a LogicalPlanNode
(with the statement text of the given ConfiguredStatement
and the created OutputNode
).
buildAndValidateLogicalPlan
is used when:
EngineExecutor
is requested to execute table pull or scalable push queries
Executing DDL Command¶
String executeDdl(
DdlCommand ddlCommand,
String statementText,
boolean withQuery,
Set<SourceName> withQuerySources,
boolean restoreInProgress)
executeDdl
requests the EngineContext to executeDdl.
getSourceNames¶
Set<SourceName> getSourceNames(
PlanNode outputNode)
getSourceNames
gives the names of all DataSources (via DataSourceNodes) in the given PlanNode.
getSourceNames
is used when:
EngineExecutor
is requested to execute transient and stream pull queries, plan a KSQL statement (and sourceTablePlan)
Execution Plan Summary¶
String buildPlanSummary(
QueryId queryId,
ExecutionStep<?> plan)
buildPlanSummary
...FIXME
buildPlanSummary
is used when:
EngineExecutor
is requested to execute transient, stream pull and persistent queries (and requests the QueryRegistry to createTransientQuery, createStreamPullQuery and createOrReplacePersistentQuery, respectively)