InteractiveStatementExecutor¶
InteractiveStatementExecutor is used by CommandRunner to handle commands (in restore and regular operation modes).
Creating Instance¶
InteractiveStatementExecutor takes the following to be created:
- ServiceContext
- KsqlEngine
- StatementParser
-
SpecificQueryIdGenerator - Kafka Deserializer for Commands
InteractiveStatementExecutor is created when:
KsqlRestApplicationis requested to build a KsqlRestApplication (to create a CommandRunner and aStatusResource)
KsqlEngine¶
InteractiveStatementExecutor is given a KsqlEngine when created.
The KsqlEngine is used mainly for the following:
The KsqlEngine is also used when:
ServiceContext¶
InteractiveStatementExecutor is given a ServiceContext when created.
The ServiceContext is used to execute a KsqlPlan (using the KsqlExecutionContext).
StatementParser¶
InteractiveStatementExecutor is given a StatementParser when created.
The StatementParser is used in handleStatementWithTerminatedQueries (to parse a KSQL statement).
Executing Queued Command¶
void handleStatement(
QueuedCommand queuedCommand)
handleStatement handles the Command (from the given QueuedCommand with EXECUTE mode).
handleStatement is used when:
CommandRunneris requested to execute a statement
Handling Command¶
void handleStatementWithTerminatedQueries(
Command command,
CommandId commandId,
Optional<CommandStatusFuture> commandStatusFuture,
Mode mode,
long offset)
handleStatementWithTerminatedQueries handles the given Command based on whether it has a KsqlPlan or just a statement.
handleStatementWithTerminatedQueries is used when:
InteractiveStatementExecutoris requested to handleStatement (withEXECUTEmode) and handleRestore (withRESTOREmode)
KsqlPlan¶
If the given Command has a KsqlPlan, handleStatementWithTerminatedQueries executes the KsqlPlan.
Statement¶
handleStatementWithTerminatedQueries puts PARSING status.
handleStatementWithTerminatedQueries requests the StatementParser to parse the statement (from the command).
handleStatementWithTerminatedQueries puts EXECUTING status.
handleStatementWithTerminatedQueries executeStatement.
Executing KsqlPlan¶
void executePlan(
Command command,
CommandId commandId,
Optional<CommandStatusFuture> commandStatusFuture,
KsqlPlan plan,
Mode mode,
long offset,
boolean restoreInProgress)
executePlan builds merged config.
executePlan puts EXECUTING status for the command with the message:
Executing statement
executePlan requests the KsqlEngine to execute the given KsqlPlan.
executePlan increments queryIdGenerator internal counter.
If this is a QueryMetadata and the given mode is EXECUTE, executePlan requests the query to start.
In the end, executePlan puts SUCCESS final status with the command result or the following message:
Created query with ID [queryId]
Executing Statement¶
void executeStatement(
PreparedStatement<?> statement,
CommandId commandId,
Optional<CommandStatusFuture> commandStatusFuture)
executeStatement branches off based on the type of the statement.
TerminateQuery¶
For TerminateQuery, executeStatement terminateQuery and puts SUCCESS final status with the following message:
Query terminated.
ExecutableDdlStatement¶
For ExecutableDdlStatement, executeStatement throwUnsupportedStatementError.
CreateAsSelect¶
For CreateAsSelect, executeStatement throwUnsupportedStatementError.
InsertInto¶
For InsertInto, executeStatement throwUnsupportedStatementError.
AlterSystemProperty¶
For AlterSystemProperty, executeStatement requests the KsqlExecutionContext to alterSystemProperty and then to updateStreamsPropertiesAndRestartRuntime.
In the end, executeStatement puts SUCCESS final status with the following message:
System property [name] was set to [value].
Other Types¶
For all other types, executeStatement throws a KsqlException:
Unexpected statement type: [className]
putFinalStatus¶
void putFinalStatus(
CommandId commandId,
Optional<CommandStatusFuture> commandStatusFuture,
CommandStatus status)
putFinalStatus associates the given CommandStatus with the CommandId (in statusStore registry).
If the given CommandStatusFuture is available, putFinalStatus sets its final status.
putFinalStatus is used when:
InteractiveStatementExecutoris requested to execute a command (KsqlPlan or Statement)
Accessing KsqlEngine¶
KsqlExecutionContext getKsqlEngine()
getKsqlEngine gives the KsqlEngine.
getKsqlEngine is used when:
CommandRunneris requested to processPriorCommands
Throwing IllegalStateException (IfNotConfigured)¶
void throwIfNotConfigured()
throwIfNotConfigured throws an IllegalStateException when there is no application.server (Kafka Streams) configuration property among the getKsqlStreamConfigProps (of the KsqlConfig of the KsqlEngine):
No initialized
throwIfNotConfigured is used when:
InteractiveStatementExecutoris requested to handleStatement and handleRestore
handleRestore¶
void handleRestore(
QueuedCommand queuedCommand)
handleRestore...FIXME
handleRestore is used when:
CommandRunneris requested to processPriorCommands
Logging¶
Enable ALL logging level for io.confluent.ksql.rest.server.computation.InteractiveStatementExecutor logger to see what happens inside.
Add the following line to log4j.properties:
log4j.logger.io.confluent.ksql.rest.server.computation.InteractiveStatementExecutor=ALL
Refer to Logging.