CommandRunner¶
CommandRunner
is responsible for command processing for KsqlRestApplication.
Commands with KSQL statements are consumed from CommandQueue and processed on a separate Java thread using InteractiveStatementExecutor.
Creating Instance¶
CommandRunner
takes the following to be created:
- InteractiveStatementExecutor
- CommandQueue
-
maxRetries
-
ClusterTerminator
-
ServerState
- ksql.service.id
- commandRunnerHealth Timeout
- Metrics group prefix
- Command Deserializer
- Error Handler
-
KafkaTopicClient
- Name of the Command Topic
-
Metrics
CommandRunner
is created when:
KsqlRestApplication
utility is used to build a KsqlRestApplication (to create KsqlResource and KsqlRestApplication itself)
InteractiveStatementExecutor¶
CommandRunner
is given an InteractiveStatementExecutor when created.
The InteractiveStatementExecutor
is used when processing prior commands (to handleRestore) and executing QueuedCommands (while fetchAndRunCommands to handleStatement).
Metrics Group Prefix¶
CommandRunner
is given a metrics group prefix when created (when building a KsqlRestApplication) and is an empty string.
The prefix is used to create a CommandRunnerMetrics.
CommandRunnerMetrics¶
CommandRunner
creates a CommandRunnerMetrics when created with the following:
The CommandRunnerMetrics
is closed when close.
CommandQueue¶
CommandRunner
is given a CommandQueue when created.
The CommandQueue
is used when CommandRunner
is requested for the following:
getCommandQueue¶
CommandQueue getCommandQueue()
getCommandQueue
is used when:
KsqlResource
is requested to configure and handleKsqlStatements
Command Deserializer¶
CommandRunner
is given a Deserializer
(Apache Kafka) of Commands when created.
The Deserializer
is used in the following:
Start Processing Queued Commands¶
void start()
start
creates and starts a Java thread (on a single-threaded thread pool) to continuously fetch and run queued commands.
Every time fetchAndRunCommands is executed, the thread prints out the following TRACE message to the logs:
Polling for new writes to command topic
start
is used when:
KsqlRestApplication
is requested to initialize
Fetching and Running Queued Commands¶
void fetchAndRunCommands()
fetchAndRunCommands
requests the CommandQueue for new commands (with 5 second
timeout).
If there are no commands, fetchAndRunCommands
leaves early (also checks if the commandTopicExists).
fetchAndRunCommands
checks for incompatible commands and then tries to find TERMINATE CLUSTER command (to terminate the cluster if found).
fetchAndRunCommands
prints out the following DEBUG message to the logs:
Found [size] new writes to command topic
fetchAndRunCommands
executes every command (one by one).
Executing Statement¶
void executeStatement(
QueuedCommand queuedCommand)
executeStatement
prints out the following INFO message to the logs:
Executing statement [commandId]
executeStatement
creates a Runnable
(Java) that, when started, requests the InteractiveStatementExecutor to execute the command and then prints out the following INFO message to the logs:
Executed statement [commandId]
executeStatement
executes the command (with retries and backoff until successful).
processPriorCommands¶
void processPriorCommands(
PersistentQueryCleanupImpl queryCleanup)
processPriorCommands
...FIXME
processPriorCommands
is used when:
KsqlRestApplication
is requested to initialize
Logging¶
Enable ALL
logging level for io.confluent.ksql.rest.server.computation.CommandRunner
logger to see what happens inside.
Add the following line to log4j.properties
:
log4j.logger.io.confluent.ksql.rest.server.computation.CommandRunner=ALL
Refer to Logging.