Skip to content


InteractiveStatementExecutor is used by CommandRunner to handle commands (in restore and regular operation modes).

Creating Instance

InteractiveStatementExecutor takes the following to be created:

InteractiveStatementExecutor is created when:


InteractiveStatementExecutor is given a KsqlEngine when created.

The KsqlEngine is used mainly for the following:

The KsqlEngine is also used when:


InteractiveStatementExecutor is given a ServiceContext when created.

The ServiceContext is used to execute a KsqlPlan (using the KsqlExecutionContext).


InteractiveStatementExecutor is given a StatementParser when created.

The StatementParser is used in handleStatementWithTerminatedQueries (to parse a KSQL statement).

Executing Queued Command

void handleStatement(
  QueuedCommand queuedCommand)

handleStatement handles the Command (from the given QueuedCommand with EXECUTE mode).

handleStatement is used when:

Handling Command

void handleStatementWithTerminatedQueries(
  Command command,
  CommandId commandId,
  Optional<CommandStatusFuture> commandStatusFuture,
  Mode mode,
  long offset)

handleStatementWithTerminatedQueries handles the given Command based on whether it has a KsqlPlan or just a statement.

handleStatementWithTerminatedQueries is used when:


If the given Command has a KsqlPlan, handleStatementWithTerminatedQueries executes the KsqlPlan.


handleStatementWithTerminatedQueries puts PARSING status.

handleStatementWithTerminatedQueries requests the StatementParser to parse the statement (from the command).

handleStatementWithTerminatedQueries puts EXECUTING status.

handleStatementWithTerminatedQueries executeStatement.

Executing KsqlPlan

void executePlan(
  Command command,
  CommandId commandId,
  Optional<CommandStatusFuture> commandStatusFuture,
  KsqlPlan plan,
  Mode mode,
  long offset,
  boolean restoreInProgress)

executePlan builds merged config.

executePlan puts EXECUTING status for the command with the message:

Executing statement

executePlan requests the KsqlEngine to execute the given KsqlPlan.

executePlan increments queryIdGenerator internal counter.

If this is a QueryMetadata and the given mode is EXECUTE, executePlan requests the query to start.

In the end, executePlan puts SUCCESS final status with the command result or the following message:

Created query with ID [queryId]

Executing Statement

void executeStatement(
  PreparedStatement<?> statement,
  CommandId commandId,
  Optional<CommandStatusFuture> commandStatusFuture)

executeStatement branches off based on the type of the statement.


For TerminateQuery, executeStatement terminateQuery and puts SUCCESS final status with the following message:

Query terminated.


For ExecutableDdlStatement, executeStatement throwUnsupportedStatementError.


For CreateAsSelect, executeStatement throwUnsupportedStatementError.


For InsertInto, executeStatement throwUnsupportedStatementError.


For AlterSystemProperty, executeStatement requests the KsqlExecutionContext to alterSystemProperty and then to updateStreamsPropertiesAndRestartRuntime.

In the end, executeStatement puts SUCCESS final status with the following message:

System property [name] was set to [value].

Other Types

For all other types, executeStatement throws a KsqlException:

Unexpected statement type: [className]


void putFinalStatus(
  CommandId commandId,
  Optional<CommandStatusFuture> commandStatusFuture,
  CommandStatus status)

putFinalStatus associates the given CommandStatus with the CommandId (in statusStore registry).

If the given CommandStatusFuture is available, putFinalStatus sets its final status.

putFinalStatus is used when:

Accessing KsqlEngine

KsqlExecutionContext getKsqlEngine()

getKsqlEngine gives the KsqlEngine.

getKsqlEngine is used when:

Throwing IllegalStateException (IfNotConfigured)

void throwIfNotConfigured()

throwIfNotConfigured throws an IllegalStateException when there is no application.server (Kafka Streams) configuration property among the getKsqlStreamConfigProps (of the KsqlConfig of the KsqlEngine):

No initialized

throwIfNotConfigured is used when:


void handleRestore(
  QueuedCommand queuedCommand)


handleRestore is used when:


Enable ALL logging level for logger to see what happens inside.

Add the following line to

Refer to Logging.