EngineExecutor¶
Creating Instance¶
EngineExecutor takes the following to be created:
EngineExecutor is created using create factory.
Creating EngineExecutor¶
EngineExecutor create(
EngineContext engineContext,
ServiceContext serviceContext,
SessionConfig config)
create creates an EngineExecutor.
create is simply a convenient static factory method that does nothing but new EngineExecutor and that little programming trick makes for a more readable fluent client code.
EngineExecutor
.create(...)
.plan(statement)
create is used when:
KsqlEngineis requested to plan, execute, executeTransientQuery, createStreamPullQuery, executeScalablePushQuery, executeTablePullQuerySandboxedExecutionContextis requested to plan, execute, executeTransientQuery, executeTablePullQueryQuery, executeScalablePushQuery
Planning Query for Execution¶
ExecutorPlans planQuery(
ConfiguredStatement<?> statement,
Query query,
Optional<Sink> sink,
Optional<String> withQueryId,
MetaStore metaStore)
In summary, planQuery takes a Query statement and creates an ExecutorPlans.
The optional Sink and query ID are only given when EngineExecutor is requested to plan a QueryContainer. Otherwise, they are both undefined.
planQuery is used when:
EngineExecutoris requested to execute transient and stream pull queries, and to plan a statement (and sourceTablePlan)
Step 1. Creating QueryEngine¶
planQuery requests the EngineContext to create a QueryEngine (for the ServiceContext).
Note
planQuery creates a QueryEngine every time it is executed.
Step 2. Building Logical Plan¶
planQuery builds a logical query plan (an OutputNode).
planQuery creates a LogicalPlanNode (for the query statement text and the OutputNode), a QueryId and looks up a PersistentQueryMetadata (in the QueryRegistry) for the QueryId (if one exists).
Step 3. Building Physical Plan¶
planQuery builds a physical query plan (a PhysicalPlan).
Step 4. Creating ExecutorPlans¶
In the end, planQuery creates an ExecutorPlans (with the LogicalPlanNode and the PhysicalPlan).
KSQL Statement Planning¶
KsqlPlan plan(
ConfiguredStatement<?> statement)
plan makes sure that the Statement (of the given ConfiguredStatement) is executable and throws a KsqlStatementException if not.
plan branches off based on the type of the statement:
plan is used when:
KsqlEngineis requested to plan a statementSandboxedExecutionContextis requested to plan a statement
ExecutableDdlStatement¶
For an ExecutableDdlStatement, plan determines whether it is a CreateStream or a CreateTable (possibly SOURCEs).
For a source CreateTable, plan sourceTablePlan.
Otherwise, plan requests the EngineContext to create a DdlCommand and then plans it.
QueryContainer¶
Otherwise, plan assumes that the Statement is a QueryContainer and plans the query (with the Sink among the others that gives a PhysicalPlan).
plan maybeCreateSinkDdl.
plan creates a QueryPlan.
In the end, plan creates a KsqlPlanV1.
Exceptions¶
plan throws a KsqlStatementException for a non-executable statement.
plan throws a KsqlStatementException for a CreateStream or a CreateTable that are source the ksql.source.table.materialization.enabled configuration property is disabled:
Cannot execute command because source table materialization is disabled.
maybeCreateSinkDdl¶
Optional<DdlCommand> maybeCreateSinkDdl(
ConfiguredStatement<?> cfgStatement,
KsqlStructuredDataOutputNode outputNode)
maybeCreateSinkDdl returns an empty value for a KsqlStructuredDataOutputNode with no createInto. maybeCreateSinkDdl validateExistingSink.
Otherwise, maybeCreateSinkDdl requests the EngineContext to createDdlCommand for the given KsqlStructuredDataOutputNode.
Planning Source Table Query¶
KsqlPlan sourceTablePlan(
ConfiguredStatement<?> statement)
sourceTablePlan assumes that the given ConfiguredStatement is for a CreateTable.
sourceTablePlan requests the EngineContext to create a CreateTableCommand.
sourceTablePlan creates a Query with the following:
AliasedRelationfor the source tableSelectRefinementInfowithCHANGESoutput refinement- pullQuery flag off
sourceTablePlan registers a KsqlTable in a temporary MetaStoreImpl (so any future table name resolution will work).
sourceTablePlan plans the query statement (with the given ConfiguredStatement, the Query, no sink and the temporary MetaStoreImpl). It will produce an ExecutorPlans.
sourceTablePlan uses the ExecutorPlans to access a KsqlBareOutputNode.
sourceTablePlan creates a QueryPlan with the following:
- getSourceNames of the KsqlBareOutputNode
- No sink
- Other params from the
ExecutorPlans
sourceTablePlan requests the EngineContext for the QueryValidator to validateQuery.
In the end, sourceTablePlan creates a KsqlPlan (a KsqlPlanV1).
Executing KsqlPlan¶
ExecuteResult execute(
KsqlPlan plan)
execute is made up of different "execution paths" to handle different variants of KsqlPlans, but mainly whether a DdlCommand is available or not
execute throws an IllegalStateException when the given KsqlPlan has neither physical query plan nor DDL command:
DdlResult should be present if there is no physical plan.
execute is used when:
KsqlEngineis requested to execute a querySandboxedExecutionContextis requested to execute a query
DDL Command¶
With a KsqlPlan with no physical query plan, execute executes the DDL command (of the KsqlPlan with the withQuery flag off) and returns an ExecuteResult.
QueryPlan¶
Physical Plan with optional DdlCommand
The given KsqlPlan may have a physical plan with or without a DdlCommand.
For a KsqlPlan with a physical query plan, execute executes a DDL command (if available) and then the persistent query.
Executing Persistent Query¶
PersistentQueryMetadata executePersistentQuery(
QueryPlan queryPlan,
String statementText,
KsqlConstants.PersistentQueryType persistentQueryType)
executePersistentQuery requests the EngineContext for the QueryRegistry to create or replace a persistent query (for the given QueryPlan and PersistentQueryType).
Executing Scalable Push Query¶
Signature
ScalablePushQueryMetadata executeScalablePushQuery(
ImmutableAnalysis analysis,
ConfiguredStatement<Query> statement,
PushRouting pushRouting,
PushRoutingOptions pushRoutingOptions,
QueryPlannerOptions queryPlannerOptions,
Context context,
Optional<ScalablePushQueryMetrics> scalablePushQueryMetrics)
executeScalablePushQuery builds a logical plan of the query (with isScalablePush flag enabled).
In the end, executeScalablePushQuery creates a ScalablePushQueryMetadata (with a new TransientQueryQueue).
Usage
executeScalablePushQuery is used when:
KsqlEngineis requested to execute a scalable push querySandboxedExecutionContextis requested to execute a scalable push query
Executing Stream Pull Query¶
Signature
TransientQueryMetadata executeStreamPullQuery(
ConfiguredStatement<Query> statement,
boolean excludeTombstones,
ImmutableMap<TopicPartition, Long> endOffsets)
executeStreamPullQuery plans the query (with an empty sink and query ID).
executeStreamPullQuery requests the EngineContext to create a QueryValidator to validateQuery (with the SessionConfig and the ExecutionPlan with ExecutionSteps).
In the end, executeStreamPullQuery requests the EngineContext for the QueryRegistry to create a stream pull query.
Executing Transient Query¶
Signature
TransientQueryMetadata executeTransientQuery(
ConfiguredStatement<Query> statement,
boolean excludeTombstones)
executeTransientQuery...FIXME
In the end, executeTransientQuery requests the EngineContext for the QueryRegistry to create a transient query.
Usage
executeTransientQuery is used when:
KsqlEngineis requested to execute a transient querySandboxedExecutionContextis requested to execute a transient query
Executing Table Pull Query¶
PullQueryResult executeTablePullQuery(
ImmutableAnalysis analysis,
ConfiguredStatement<Query> statement,
HARouting routing,
RoutingOptions routingOptions,
QueryPlannerOptions queryPlannerOptions,
Optional<PullQueryExecutorMetrics> pullQueryMetrics,
boolean startImmediately,
Optional<ConsistencyOffsetVector> consistencyOffsetVector)
executeTablePullQuery buildAndValidateLogicalPlan followed by buildPullPhysicalPlan.
executeTablePullQuery creates a PullQueryQueue and a PullQueryQueuePopulator (to requests the given HARouting to handlePullQuery).
executeTablePullQuery creates a PullQueryResult (and starts it when the given startImmediately flag is enabled).
executeTablePullQuery is used when:
KsqlEngineis requested to execute a table pull querySandboxedExecutionContextis requested to execute a table pull query
Building Query Logical Plan¶
LogicalPlanNode buildAndValidateLogicalPlan(
ConfiguredStatement<?> statement,
ImmutableAnalysis analysis,
KsqlConfig config,
QueryPlannerOptions queryPlannerOptions,
boolean isScalablePush)
buildAndValidateLogicalPlan is given a statement (along with the analysis) and the isScalablePush flag as follows:
truewhen executing a scalable push queryfalsewhen executing a table pull query
Naming
Since buildAndValidateLogicalPlan is to execute buildQueryLogicalPlan it'd make sense to call it alike, wouldn't it?
After all, buildAndValidateLogicalPlan is a facade to LogicalPlanner.
buildAndValidateLogicalPlan creates a LogicalPlanner to build a logical plan of a query (that gives an OutputNode).
In the end, buildAndValidateLogicalPlan creates a LogicalPlanNode (with the statement text of the given ConfiguredStatement and the created OutputNode).
buildAndValidateLogicalPlan is used when:
EngineExecutoris requested to execute table pull or scalable push queries
Executing DDL Command¶
String executeDdl(
DdlCommand ddlCommand,
String statementText,
boolean withQuery,
Set<SourceName> withQuerySources,
boolean restoreInProgress)
executeDdl requests the EngineContext to executeDdl.
getSourceNames¶
Set<SourceName> getSourceNames(
PlanNode outputNode)
getSourceNames gives the names of all DataSources (via DataSourceNodes) in the given PlanNode.
getSourceNames is used when:
EngineExecutoris requested to execute transient and stream pull queries, plan a KSQL statement (and sourceTablePlan)
Execution Plan Summary¶
String buildPlanSummary(
QueryId queryId,
ExecutionStep<?> plan)
buildPlanSummary...FIXME
buildPlanSummary is used when:
EngineExecutoris requested to execute transient, stream pull and persistent queries (and requests the QueryRegistry to createTransientQuery, createStreamPullQuery and createOrReplacePersistentQuery, respectively)