Skip to content

EngineExecutor

Creating Instance

EngineExecutor takes the following to be created:

EngineExecutor is created using create factory.

Creating EngineExecutor

EngineExecutor create(
  EngineContext engineContext,
  ServiceContext serviceContext,
  SessionConfig config)

create creates an EngineExecutor.


create is simply a convenient static factory method that does nothing but new EngineExecutor and that little programming trick makes for a more readable fluent client code.

EngineExecutor
  .create(...)
  .plan(statement)

create is used when:

Planning Query for Execution

ExecutorPlans planQuery(
  ConfiguredStatement<?> statement,
  Query query,
  Optional<Sink> sink,
  Optional<String> withQueryId,
  MetaStore metaStore)

In summary, planQuery takes a Query statement and creates an ExecutorPlans.


The optional Sink and query ID are only given when EngineExecutor is requested to plan a QueryContainer. Otherwise, they are both undefined.


planQuery is used when:

Step 1. Creating QueryEngine

planQuery requests the EngineContext to create a QueryEngine (for the ServiceContext).

Note

planQuery creates a QueryEngine every time it is executed.

Step 2. Building Logical Plan

planQuery builds a logical query plan (an OutputNode).

planQuery creates a LogicalPlanNode (for the query statement text and the OutputNode), a QueryId and looks up a PersistentQueryMetadata (in the QueryRegistry) for the QueryId (if one exists).

Step 3. Building Physical Plan

planQuery builds a physical query plan (a PhysicalPlan).

Step 4. Creating ExecutorPlans

In the end, planQuery creates an ExecutorPlans (with the LogicalPlanNode and the PhysicalPlan).

KSQL Statement Planning

KsqlPlan plan(
  ConfiguredStatement<?> statement)

plan makes sure that the Statement (of the given ConfiguredStatement) is executable and throws a KsqlStatementException if not.

plan branches off based on the type of the statement:


plan is used when:

ExecutableDdlStatement

For an ExecutableDdlStatement, plan determines whether it is a CreateStream or a CreateTable (possibly SOURCEs).

For a source CreateTable, plan sourceTablePlan.

Otherwise, plan requests the EngineContext to create a DdlCommand and then plans it.

QueryContainer

Otherwise, plan assumes that the Statement is a QueryContainer and plans the query (with the Sink among the others that gives a PhysicalPlan).

plan maybeCreateSinkDdl.

plan creates a QueryPlan.

In the end, plan creates a KsqlPlanV1.

Exceptions

plan throws a KsqlStatementException for a non-executable statement.

plan throws a KsqlStatementException for a CreateStream or a CreateTable that are source the ksql.source.table.materialization.enabled configuration property is disabled:

Cannot execute command because source table materialization is disabled.

maybeCreateSinkDdl

Optional<DdlCommand> maybeCreateSinkDdl(
  ConfiguredStatement<?> cfgStatement,
  KsqlStructuredDataOutputNode outputNode)

maybeCreateSinkDdl returns an empty value for a KsqlStructuredDataOutputNode with no createInto. maybeCreateSinkDdl validateExistingSink.

Otherwise, maybeCreateSinkDdl requests the EngineContext to createDdlCommand for the given KsqlStructuredDataOutputNode.

Planning Source Table Query

KsqlPlan sourceTablePlan(
  ConfiguredStatement<?> statement)

sourceTablePlan assumes that the given ConfiguredStatement is for a CreateTable.

sourceTablePlan requests the EngineContext to create a CreateTableCommand.

sourceTablePlan creates a Query with the following:

  • AliasedRelation for the source table
  • Select
  • RefinementInfo with CHANGES output refinement
  • pullQuery flag off

sourceTablePlan registers a KsqlTable in a temporary MetaStoreImpl (so any future table name resolution will work).

sourceTablePlan plans the query statement (with the given ConfiguredStatement, the Query, no sink and the temporary MetaStoreImpl). It will produce an ExecutorPlans.

sourceTablePlan uses the ExecutorPlans to access a KsqlBareOutputNode.

sourceTablePlan creates a QueryPlan with the following:

sourceTablePlan requests the EngineContext for the QueryValidator to validateQuery.

In the end, sourceTablePlan creates a KsqlPlan (a KsqlPlanV1).

Executing KsqlPlan

ExecuteResult execute(
  KsqlPlan plan)

execute is made up of different "execution paths" to handle different variants of KsqlPlans, but mainly whether a DdlCommand is available or not

execute throws an IllegalStateException when the given KsqlPlan has neither physical query plan nor DDL command:

DdlResult should be present if there is no physical plan.

execute is used when:

DDL Command

With a KsqlPlan with no physical query plan, execute executes the DDL command (of the KsqlPlan with the withQuery flag off) and returns an ExecuteResult.

QueryPlan

Physical Plan with optional DdlCommand

The given KsqlPlan may have a physical plan with or without a DdlCommand.

For a KsqlPlan with a physical query plan, execute executes a DDL command (if available) and then the persistent query.

Executing Persistent Query

PersistentQueryMetadata executePersistentQuery(
  QueryPlan queryPlan,
  String statementText,
  KsqlConstants.PersistentQueryType persistentQueryType)

executePersistentQuery requests the EngineContext for the QueryRegistry to create or replace a persistent query (for the given QueryPlan and PersistentQueryType).

Executing Scalable Push Query

Signature
ScalablePushQueryMetadata executeScalablePushQuery(
  ImmutableAnalysis analysis,
  ConfiguredStatement<Query> statement,
  PushRouting pushRouting,
  PushRoutingOptions pushRoutingOptions,
  QueryPlannerOptions queryPlannerOptions,
  Context context,
  Optional<ScalablePushQueryMetrics> scalablePushQueryMetrics)

executeScalablePushQuery builds a logical plan of the query (with isScalablePush flag enabled).

In the end, executeScalablePushQuery creates a ScalablePushQueryMetadata (with a new TransientQueryQueue).

Usage

executeScalablePushQuery is used when:

Executing Stream Pull Query

Signature
TransientQueryMetadata executeStreamPullQuery(
  ConfiguredStatement<Query> statement,
  boolean excludeTombstones,
  ImmutableMap<TopicPartition, Long> endOffsets)

executeStreamPullQuery plans the query (with an empty sink and query ID).

executeStreamPullQuery requests the EngineContext to create a QueryValidator to validateQuery (with the SessionConfig and the ExecutionPlan with ExecutionSteps).

In the end, executeStreamPullQuery requests the EngineContext for the QueryRegistry to create a stream pull query.

Usage

executeStreamPullQuery is used when:

Executing Transient Query

Signature
TransientQueryMetadata executeTransientQuery(
  ConfiguredStatement<Query> statement,
  boolean excludeTombstones)

executeTransientQuery...FIXME

In the end, executeTransientQuery requests the EngineContext for the QueryRegistry to create a transient query.

Usage

executeTransientQuery is used when:

Executing Table Pull Query

PullQueryResult executeTablePullQuery(
  ImmutableAnalysis analysis,
  ConfiguredStatement<Query> statement,
  HARouting routing,
  RoutingOptions routingOptions,
  QueryPlannerOptions queryPlannerOptions,
  Optional<PullQueryExecutorMetrics> pullQueryMetrics,
  boolean startImmediately,
  Optional<ConsistencyOffsetVector> consistencyOffsetVector)

executeTablePullQuery buildAndValidateLogicalPlan followed by buildPullPhysicalPlan.

executeTablePullQuery creates a PullQueryQueue and a PullQueryQueuePopulator (to requests the given HARouting to handlePullQuery).

executeTablePullQuery creates a PullQueryResult (and starts it when the given startImmediately flag is enabled).

executeTablePullQuery is used when:

Building Query Logical Plan

LogicalPlanNode buildAndValidateLogicalPlan(
  ConfiguredStatement<?> statement,
  ImmutableAnalysis analysis,
  KsqlConfig config,
  QueryPlannerOptions queryPlannerOptions,
  boolean isScalablePush)

buildAndValidateLogicalPlan is given a statement (along with the analysis) and the isScalablePush flag as follows:

Naming

Since buildAndValidateLogicalPlan is to execute buildQueryLogicalPlan it'd make sense to call it alike, wouldn't it?

After all, buildAndValidateLogicalPlan is a facade to LogicalPlanner.


buildAndValidateLogicalPlan creates a LogicalPlanner to build a logical plan of a query (that gives an OutputNode).

In the end, buildAndValidateLogicalPlan creates a LogicalPlanNode (with the statement text of the given ConfiguredStatement and the created OutputNode).


buildAndValidateLogicalPlan is used when:

Executing DDL Command

String executeDdl(
  DdlCommand ddlCommand,
  String statementText,
  boolean withQuery,
  Set<SourceName> withQuerySources,
  boolean restoreInProgress)

executeDdl requests the EngineContext to executeDdl.

getSourceNames

Set<SourceName> getSourceNames(
  PlanNode outputNode)

getSourceNames gives the names of all DataSources (via DataSourceNodes) in the given PlanNode.


getSourceNames is used when:

Execution Plan Summary

String buildPlanSummary(
  QueryId queryId,
  ExecutionStep<?> plan)

buildPlanSummary...FIXME


buildPlanSummary is used when: