InteractiveStatementExecutor¶
InteractiveStatementExecutor
is used by CommandRunner to handle commands (in restore and regular operation modes).
Creating Instance¶
InteractiveStatementExecutor
takes the following to be created:
- ServiceContext
- KsqlEngine
- StatementParser
-
SpecificQueryIdGenerator
- Kafka Deserializer for Commands
InteractiveStatementExecutor
is created when:
KsqlRestApplication
is requested to build a KsqlRestApplication (to create a CommandRunner and aStatusResource
)
KsqlEngine¶
InteractiveStatementExecutor
is given a KsqlEngine when created.
The KsqlEngine
is used mainly for the following:
The KsqlEngine
is also used when:
ServiceContext¶
InteractiveStatementExecutor
is given a ServiceContext when created.
The ServiceContext
is used to execute a KsqlPlan (using the KsqlExecutionContext).
StatementParser¶
InteractiveStatementExecutor
is given a StatementParser when created.
The StatementParser
is used in handleStatementWithTerminatedQueries (to parse a KSQL statement).
Executing Queued Command¶
void handleStatement(
QueuedCommand queuedCommand)
handleStatement
handles the Command (from the given QueuedCommand
with EXECUTE
mode).
handleStatement
is used when:
CommandRunner
is requested to execute a statement
Handling Command¶
void handleStatementWithTerminatedQueries(
Command command,
CommandId commandId,
Optional<CommandStatusFuture> commandStatusFuture,
Mode mode,
long offset)
handleStatementWithTerminatedQueries
handles the given Command based on whether it has a KsqlPlan or just a statement.
handleStatementWithTerminatedQueries
is used when:
InteractiveStatementExecutor
is requested to handleStatement (withEXECUTE
mode) and handleRestore (withRESTORE
mode)
KsqlPlan¶
If the given Command has a KsqlPlan, handleStatementWithTerminatedQueries
executes the KsqlPlan.
Statement¶
handleStatementWithTerminatedQueries
puts PARSING status.
handleStatementWithTerminatedQueries
requests the StatementParser to parse the statement (from the command).
handleStatementWithTerminatedQueries
puts EXECUTING status.
handleStatementWithTerminatedQueries
executeStatement.
Executing KsqlPlan¶
void executePlan(
Command command,
CommandId commandId,
Optional<CommandStatusFuture> commandStatusFuture,
KsqlPlan plan,
Mode mode,
long offset,
boolean restoreInProgress)
executePlan
builds merged config.
executePlan
puts EXECUTING status for the command with the message:
Executing statement
executePlan
requests the KsqlEngine to execute the given KsqlPlan.
executePlan
increments queryIdGenerator
internal counter.
If this is a QueryMetadata and the given mode is EXECUTE
, executePlan
requests the query to start.
In the end, executePlan
puts SUCCESS final status with the command result or the following message:
Created query with ID [queryId]
Executing Statement¶
void executeStatement(
PreparedStatement<?> statement,
CommandId commandId,
Optional<CommandStatusFuture> commandStatusFuture)
executeStatement
branches off based on the type of the statement.
TerminateQuery¶
For TerminateQuery, executeStatement
terminateQuery and puts SUCCESS final status with the following message:
Query terminated.
ExecutableDdlStatement¶
For ExecutableDdlStatement, executeStatement
throwUnsupportedStatementError.
CreateAsSelect¶
For CreateAsSelect, executeStatement
throwUnsupportedStatementError.
InsertInto¶
For InsertInto, executeStatement
throwUnsupportedStatementError.
AlterSystemProperty¶
For AlterSystemProperty, executeStatement
requests the KsqlExecutionContext to alterSystemProperty and then to updateStreamsPropertiesAndRestartRuntime.
In the end, executeStatement
puts SUCCESS final status with the following message:
System property [name] was set to [value].
Other Types¶
For all other types, executeStatement
throws a KsqlException
:
Unexpected statement type: [className]
putFinalStatus¶
void putFinalStatus(
CommandId commandId,
Optional<CommandStatusFuture> commandStatusFuture,
CommandStatus status)
putFinalStatus
associates the given CommandStatus
with the CommandId
(in statusStore registry).
If the given CommandStatusFuture
is available, putFinalStatus
sets its final status.
putFinalStatus
is used when:
InteractiveStatementExecutor
is requested to execute a command (KsqlPlan or Statement)
Accessing KsqlEngine¶
KsqlExecutionContext getKsqlEngine()
getKsqlEngine
gives the KsqlEngine.
getKsqlEngine
is used when:
CommandRunner
is requested to processPriorCommands
Throwing IllegalStateException (IfNotConfigured)¶
void throwIfNotConfigured()
throwIfNotConfigured
throws an IllegalStateException
when there is no application.server
(Kafka Streams) configuration property among the getKsqlStreamConfigProps (of the KsqlConfig of the KsqlEngine):
No initialized
throwIfNotConfigured
is used when:
InteractiveStatementExecutor
is requested to handleStatement and handleRestore
handleRestore¶
void handleRestore(
QueuedCommand queuedCommand)
handleRestore
...FIXME
handleRestore
is used when:
CommandRunner
is requested to processPriorCommands
Logging¶
Enable ALL
logging level for io.confluent.ksql.rest.server.computation.InteractiveStatementExecutor
logger to see what happens inside.
Add the following line to log4j.properties
:
log4j.logger.io.confluent.ksql.rest.server.computation.InteractiveStatementExecutor=ALL
Refer to Logging.