Skip to content

InteractiveStatementExecutor

InteractiveStatementExecutor is used by CommandRunner to handle commands (in restore and regular operation modes).

Creating Instance

InteractiveStatementExecutor takes the following to be created:

InteractiveStatementExecutor is created when:

KsqlEngine

InteractiveStatementExecutor is given a KsqlEngine when created.

The KsqlEngine is used mainly for the following:

The KsqlEngine is also used when:

ServiceContext

InteractiveStatementExecutor is given a ServiceContext when created.

The ServiceContext is used to execute a KsqlPlan (using the KsqlExecutionContext).

StatementParser

InteractiveStatementExecutor is given a StatementParser when created.

The StatementParser is used in handleStatementWithTerminatedQueries (to parse a KSQL statement).

Executing Queued Command

void handleStatement(
  QueuedCommand queuedCommand)

handleStatement handles the Command (from the given QueuedCommand with EXECUTE mode).


handleStatement is used when:

Handling Command

void handleStatementWithTerminatedQueries(
  Command command,
  CommandId commandId,
  Optional<CommandStatusFuture> commandStatusFuture,
  Mode mode,
  long offset)

handleStatementWithTerminatedQueries handles the given Command based on whether it has a KsqlPlan or just a statement.


handleStatementWithTerminatedQueries is used when:

KsqlPlan

If the given Command has a KsqlPlan, handleStatementWithTerminatedQueries executes the KsqlPlan.

Statement

handleStatementWithTerminatedQueries puts PARSING status.

handleStatementWithTerminatedQueries requests the StatementParser to parse the statement (from the command).

handleStatementWithTerminatedQueries puts EXECUTING status.

handleStatementWithTerminatedQueries executeStatement.

Executing KsqlPlan

void executePlan(
  Command command,
  CommandId commandId,
  Optional<CommandStatusFuture> commandStatusFuture,
  KsqlPlan plan,
  Mode mode,
  long offset,
  boolean restoreInProgress)

executePlan builds merged config.

executePlan puts EXECUTING status for the command with the message:

Executing statement

executePlan requests the KsqlEngine to execute the given KsqlPlan.

executePlan increments queryIdGenerator internal counter.

If this is a QueryMetadata and the given mode is EXECUTE, executePlan requests the query to start.

In the end, executePlan puts SUCCESS final status with the command result or the following message:

Created query with ID [queryId]

Executing Statement

void executeStatement(
  PreparedStatement<?> statement,
  CommandId commandId,
  Optional<CommandStatusFuture> commandStatusFuture)

executeStatement branches off based on the type of the statement.

TerminateQuery

For TerminateQuery, executeStatement terminateQuery and puts SUCCESS final status with the following message:

Query terminated.

ExecutableDdlStatement

For ExecutableDdlStatement, executeStatement throwUnsupportedStatementError.

CreateAsSelect

For CreateAsSelect, executeStatement throwUnsupportedStatementError.

InsertInto

For InsertInto, executeStatement throwUnsupportedStatementError.

AlterSystemProperty

For AlterSystemProperty, executeStatement requests the KsqlExecutionContext to alterSystemProperty and then to updateStreamsPropertiesAndRestartRuntime.

In the end, executeStatement puts SUCCESS final status with the following message:

System property [name] was set to [value].

Other Types

For all other types, executeStatement throws a KsqlException:

Unexpected statement type: [className]

putFinalStatus

void putFinalStatus(
  CommandId commandId,
  Optional<CommandStatusFuture> commandStatusFuture,
  CommandStatus status)

putFinalStatus associates the given CommandStatus with the CommandId (in statusStore registry).

If the given CommandStatusFuture is available, putFinalStatus sets its final status.


putFinalStatus is used when:

Accessing KsqlEngine

KsqlExecutionContext getKsqlEngine()

getKsqlEngine gives the KsqlEngine.


getKsqlEngine is used when:

Throwing IllegalStateException (IfNotConfigured)

void throwIfNotConfigured()

throwIfNotConfigured throws an IllegalStateException when there is no application.server (Kafka Streams) configuration property among the getKsqlStreamConfigProps (of the KsqlConfig of the KsqlEngine):

No initialized

throwIfNotConfigured is used when:

handleRestore

void handleRestore(
  QueuedCommand queuedCommand)

handleRestore...FIXME


handleRestore is used when:

Logging

Enable ALL logging level for io.confluent.ksql.rest.server.computation.InteractiveStatementExecutor logger to see what happens inside.

Add the following line to log4j.properties:

log4j.logger.io.confluent.ksql.rest.server.computation.InteractiveStatementExecutor=ALL

Refer to Logging.