CommandRunner¶
CommandRunner is responsible for command processing for KsqlRestApplication.
Commands with KSQL statements are consumed from CommandQueue and processed on a separate Java thread using InteractiveStatementExecutor.

Creating Instance¶
CommandRunner takes the following to be created:
- InteractiveStatementExecutor
- CommandQueue
-
maxRetries -
ClusterTerminator -
ServerState - ksql.service.id
- commandRunnerHealth Timeout
- Metrics group prefix
- Command Deserializer
- Error Handler
-
KafkaTopicClient - Name of the Command Topic
-
Metrics
CommandRunner is created when:
KsqlRestApplicationutility is used to build a KsqlRestApplication (to create KsqlResource and KsqlRestApplication itself)
InteractiveStatementExecutor¶
CommandRunner is given an InteractiveStatementExecutor when created.
The InteractiveStatementExecutor is used when processing prior commands (to handleRestore) and executing QueuedCommands (while fetchAndRunCommands to handleStatement).
Metrics Group Prefix¶
CommandRunner is given a metrics group prefix when created (when building a KsqlRestApplication) and is an empty string.
The prefix is used to create a CommandRunnerMetrics.
CommandRunnerMetrics¶
CommandRunner creates a CommandRunnerMetrics when created with the following:
The CommandRunnerMetrics is closed when close.
CommandQueue¶
CommandRunner is given a CommandQueue when created.
The CommandQueue is used when CommandRunner is requested for the following:
getCommandQueue¶
CommandQueue getCommandQueue()
getCommandQueue is used when:
KsqlResourceis requested to configure and handleKsqlStatements
Command Deserializer¶
CommandRunner is given a Deserializer (Apache Kafka) of Commands when created.
The Deserializer is used in the following:
Start Processing Queued Commands¶
void start()
start creates and starts a Java thread (on a single-threaded thread pool) to continuously fetch and run queued commands.
Every time fetchAndRunCommands is executed, the thread prints out the following TRACE message to the logs:
Polling for new writes to command topic
start is used when:
KsqlRestApplicationis requested to initialize
Fetching and Running Queued Commands¶
void fetchAndRunCommands()
fetchAndRunCommands requests the CommandQueue for new commands (with 5 second timeout).
If there are no commands, fetchAndRunCommands leaves early (also checks if the commandTopicExists).
fetchAndRunCommands checks for incompatible commands and then tries to find TERMINATE CLUSTER command (to terminate the cluster if found).
fetchAndRunCommands prints out the following DEBUG message to the logs:
Found [size] new writes to command topic
fetchAndRunCommands executes every command (one by one).
Executing Statement¶
void executeStatement(
QueuedCommand queuedCommand)
executeStatement prints out the following INFO message to the logs:
Executing statement [commandId]
executeStatement creates a Runnable (Java) that, when started, requests the InteractiveStatementExecutor to execute the command and then prints out the following INFO message to the logs:
Executed statement [commandId]
executeStatement executes the command (with retries and backoff until successful).
processPriorCommands¶
void processPriorCommands(
PersistentQueryCleanupImpl queryCleanup)
processPriorCommands...FIXME
processPriorCommands is used when:
KsqlRestApplicationis requested to initialize
Logging¶
Enable ALL logging level for io.confluent.ksql.rest.server.computation.CommandRunner logger to see what happens inside.
Add the following line to log4j.properties:
log4j.logger.io.confluent.ksql.rest.server.computation.CommandRunner=ALL
Refer to Logging.