Skip to content


Creating Instance

EngineExecutor takes the following to be created:

EngineExecutor is created using create factory.

Creating EngineExecutor

EngineExecutor create(
  EngineContext engineContext,
  ServiceContext serviceContext,
  SessionConfig config)

create creates an EngineExecutor.

create is simply a convenient static factory method that does nothing but new EngineExecutor and that little programming trick makes for a more readable fluent client code.


create is used when:

Planning Query for Execution (planQuery)

ExecutorPlans planQuery(
  ConfiguredStatement<?> statement,
  Query query,
  Optional<Sink> sink,
  Optional<String> withQueryId,
  MetaStore metaStore)

planQuery takes a Query and creates an ExecutorPlans.

The optional Sink and query ID are only given when EngineExecutor is requested to plan a QueryContainer. Otherwise, they are both undefined.

planQuery is used when:

Step 1. Creating QueryEngine

planQuery requests the EngineContext to create a QueryEngine (for the ServiceContext).


planQuery creates a QueryEngine every time it is executed.

Step 2. Building Logical Plan

planQuery builds a logical query plan (an OutputNode).

planQuery creates a LogicalPlanNode (for the query statement text and the OutputNode), a QueryId and looks up a PersistentQueryMetadata (in QueryRegistry) for the QueryId (if one exists).

Step 3. Building Physical Plan

planQuery builds a physical query plan (a PhysicalPlan).

Step 4. Creating ExecutorPlans

In the end, planQuery creates an ExecutorPlans (with the LogicalPlanNode and the PhysicalPlan).

Planning ExecutableDdlStatement or QueryContainer (plan)

KsqlPlan plan(
  ConfiguredStatement<?> statement)

plan requests the given ConfiguredStatement for the Statement and branches off based on the type of the statement:

plan is used when:


For a ExecutableDdlStatement, plan determines whether it is a CreateStream or a CreateTable. They are both supposed to be a source.

For a source CreateTable, plan sourceTablePlan. Otherwise, plan requests the EngineContext to create a DdlCommand and then creates a KsqlPlanV1.


Otherwise, plan assumes that the Statement is a QueryContainer and plans the query (with the Sink among the others that gives a PhysicalPlan).

plan maybeCreateSinkDdl.

plan creates a QueryPlan.

In the end, plan creates a KsqlPlanV1.


plan throws a KsqlStatementException for a non-executable statement.

plan throws a KsqlStatementException for a CreateStream or a CreateTable that are source the ksql.source.table.materialization.enabled configuration property is disabled:

Cannot execute command because source table materialization is disabled.


Optional<DdlCommand> maybeCreateSinkDdl(
  ConfiguredStatement<?> cfgStatement,
  KsqlStructuredDataOutputNode outputNode)

maybeCreateSinkDdl returns an empty value for a KsqlStructuredDataOutputNode with no createInto. maybeCreateSinkDdl validateExistingSink.

Otherwise, maybeCreateSinkDdl requests the EngineContext to createDdlCommand for the given KsqlStructuredDataOutputNode.


KsqlPlan sourceTablePlan(
  ConfiguredStatement<?> statement)

sourceTablePlan assumes that the given ConfiguredStatement is for a CreateTable.


Executing KsqlPlan (DdlCommand or Persistent Query)

ExecuteResult execute(
  KsqlPlan plan)


It is an IllegalStateException when the given KsqlPlan has neither a physical plan nor a DdlCommand.

For the given KsqlPlan with no physical plan, execute executes the DDL command (of the KsqlPlan with the withQuery flag off) and returns.

Physical Plan with optional DdlCommand

The given KsqlPlan may have a physical plan with or without a DdlCommand.

Otherwise, for the given KsqlPlan with a physical plan, execute executes a DDL command (if available) and then the persistent query.

execute is used when:

Executing Persistent Query

PersistentQueryMetadata executePersistentQuery(
  QueryPlan queryPlan,
  String statementText,
  KsqlConstants.PersistentQueryType persistentQueryType)

executePersistentQuery requests the EngineContext for the QueryRegistry to create or replace a persistent query (for the given QueryPlan and PersistentQueryType).

Executing Scalable Push Query

ScalablePushQueryMetadata executeScalablePushQuery(
  ImmutableAnalysis analysis,
  ConfiguredStatement<Query> statement,
  PushRouting pushRouting,
  PushRoutingOptions pushRoutingOptions,
  QueryPlannerOptions queryPlannerOptions,
  Context context,
  Optional<ScalablePushQueryMetrics> scalablePushQueryMetrics)

executeScalablePushQuery buildAndValidateLogicalPlan (with isScalablePush flag enabled).

In the end, executeScalablePushQuery creates a ScalablePushQueryMetadata (with a new TransientQueryQueue).

executeScalablePushQuery is used when:

Executing Stream Pull Query

TransientQueryMetadata executeStreamPullQuery(
  ConfiguredStatement<Query> statement,
  boolean excludeTombstones,
  ImmutableMap<TopicPartition, Long> endOffsets)


executeStreamPullQuery is used when:

Executing Transient Query

TransientQueryMetadata executeTransientQuery(
  ConfiguredStatement<Query> statement,
  boolean excludeTombstones)


In the end, executeTransientQuery requests the EngineContext for the QueryRegistry to create a transient query.

executeTransientQuery is used when:

Executing Table Pull Query

PullQueryResult executeTablePullQuery(
  ImmutableAnalysis analysis,
  ConfiguredStatement<Query> statement,
  HARouting routing,
  RoutingOptions routingOptions,
  QueryPlannerOptions queryPlannerOptions,
  Optional<PullQueryExecutorMetrics> pullQueryMetrics,
  boolean startImmediately,
  Optional<ConsistencyOffsetVector> consistencyOffsetVector)

executeTablePullQuery buildAndValidateLogicalPlan followed by buildPullPhysicalPlan.

executeTablePullQuery creates a PullQueryQueue and a PullQueryQueuePopulator (to requests the given HARouting to handlePullQuery).

executeTablePullQuery creates a PullQueryResult (and starts it when the given startImmediately flag is enabled).

executeTablePullQuery is used when:

Building Logical Plan of Query (buildAndValidateLogicalPlan)

LogicalPlanNode buildAndValidateLogicalPlan(
  ConfiguredStatement<?> statement,
  ImmutableAnalysis analysis,
  KsqlConfig config,
  QueryPlannerOptions queryPlannerOptions,
  boolean isScalablePush)

buildAndValidateLogicalPlan is given a statement (along with the analysis) and the isScalablePush flag as follows:


Since buildAndValidateLogicalPlan is to execute buildQueryLogicalPlan it'd make sense to call it alike, wouldn't it?

After all, buildAndValidateLogicalPlan is a facade to LogicalPlanner.

buildAndValidateLogicalPlan creates a LogicalPlanner to build a logical plan of a query (that gives an OutputNode).

In the end, buildAndValidateLogicalPlan creates a LogicalPlanNode (with the statement text of the given ConfiguredStatement and the OutputNode).

buildAndValidateLogicalPlan is used when:

Back to top