Skip to content

CommandRunner

CommandRunner is responsible for command processing for KsqlRestApplication.

Commands with KSQL statements are consumed from CommandQueue and processed on a separate Java thread using InteractiveStatementExecutor.

CommandRunner

Creating Instance

CommandRunner takes the following to be created:

CommandRunner is created when:

InteractiveStatementExecutor

CommandRunner is given an InteractiveStatementExecutor when created.

The InteractiveStatementExecutor is used when processing prior commands (to handleRestore) and executing QueuedCommands (while fetchAndRunCommands to handleStatement).

Metrics Group Prefix

CommandRunner is given a metrics group prefix when created (when building a KsqlRestApplication) and is an empty string.

The prefix is used to create a CommandRunnerMetrics.

CommandRunnerMetrics

CommandRunner creates a CommandRunnerMetrics when created with the following:

The CommandRunnerMetrics is closed when close.

CommandQueue

CommandRunner is given a CommandQueue when created.

The CommandQueue is used when CommandRunner is requested for the following:

getCommandQueue

CommandQueue getCommandQueue()

getCommandQueue is used when:

Command Deserializer

CommandRunner is given a Deserializer (Apache Kafka) of Commands when created.

The Deserializer is used in the following:

Start Processing Queued Commands

void start()

start creates and starts a Java thread (on a single-threaded thread pool) to continuously fetch and run queued commands.

Every time fetchAndRunCommands is executed, the thread prints out the following TRACE message to the logs:

Polling for new writes to command topic

start is used when:

Fetching and Running Queued Commands

void fetchAndRunCommands()

fetchAndRunCommands requests the CommandQueue for new commands (with 5 second timeout).

If there are no commands, fetchAndRunCommands leaves early (also checks if the commandTopicExists).

fetchAndRunCommands checks for incompatible commands and then tries to find TERMINATE CLUSTER command (to terminate the cluster if found).

fetchAndRunCommands prints out the following DEBUG message to the logs:

Found [size] new writes to command topic

fetchAndRunCommands executes every command (one by one).

Executing Statement

void executeStatement(
  QueuedCommand queuedCommand)

executeStatement prints out the following INFO message to the logs:

Executing statement [commandId]

executeStatement creates a Runnable (Java) that, when started, requests the InteractiveStatementExecutor to execute the command and then prints out the following INFO message to the logs:

Executed statement [commandId]

executeStatement executes the command (with retries and backoff until successful).

processPriorCommands

void processPriorCommands(
  PersistentQueryCleanupImpl queryCleanup)

processPriorCommands...FIXME

processPriorCommands is used when:

Logging

Enable ALL logging level for io.confluent.ksql.rest.server.computation.CommandRunner logger to see what happens inside.

Add the following line to log4j.properties:

log4j.logger.io.confluent.ksql.rest.server.computation.CommandRunner=ALL

Refer to Logging.