Skip to content


KsqlRestApplication is a ksqlDB REST API server.

KsqlRestApplication uses CommandRunner to process prior commands and then run new commands continuously (off a CommandQueue).

KsqlRestApplication can be started using ksql-server-start shell script.

Creating Instance

KsqlRestApplication takes the following to be created:

When created, KsqlRestApplication prints out the following DEBUG message to the logs:

Creating instance of ksqlDB API server

In the end, KsqlRestApplication prints out the following DEBUG message to the logs:

ksqlDB API server instance created

KsqlRestApplication is created using buildApplication utility.


KsqlRestApplication is given a KsqlEngine when created.

The KsqlEngine is active until shutdown.

The KsqlEngine is used in startAsync to create the REST API endpoints:


KsqlRestApplication is given a KsqlResource when created.

KsqlResource is used to create a KsqlServerEndpoints upon starting.

KsqlResource is also used to maybeCreateProcessingLogStream upon initializing.


KsqlRestApplication is given a CommandRunner when created.

The CommandRunner is requested to processPriorCommands before starting command execution in initialize. The CommandRunner is up and running until shutdown (when it is requested to close).

The CommandRunner is used to create a HealthCheckResource when KsqlRestApplication is created.


KsqlRestApplication is given KsqlConfigurables when created.

All the given KsqlConfigurables are also given separately to create the KsqlRestApplication:

KsqlConfigurables are requested to configure (with a KsqlConfig with an application.server property assigned) in startAsync.

Building KsqlRestApplication

KsqlRestApplication buildApplication(
  KsqlRestConfig restConfig,
  MetricCollectors metricCollectors)
KsqlRestApplication buildApplication(
  String metricsPrefix,
  KsqlRestConfig restConfig,
  ServerState serverState,
  Function<Supplier<Boolean>, VersionCheckerAgent> versionCheckerFactory,
  int maxStatementRetries,
  ServiceContext serviceContext,
  Supplier<SchemaRegistryClient> schemaRegistryClientFactory,
  ConnectClientFactory connectClientFactory,
  Vertx vertx,
  KsqlClient sharedClient,
  DefaultServiceContextFactory defaultServiceContextFactory,
  UserServiceContextFactory userServiceContextFactory,
  MetricCollectors metricCollectors)

buildApplication is used when:

Step 1. Vert.x

buildApplication creates a Vert.x subsystem.


Vert.x allows writing reactive applications on the JVM with support for HTTP, TCP, UDP, file system, asynchronous streams. etc.

buildApplication creates an internal KsqlClient.

buildApplication creates a KsqlSchemaRegistryClientFactory and DefaultConnectClientFactory.

buildApplication determines the Kafka Cluster ID and reads the configuration property (from the KsqlConfig).

buildApplication creates a KsqlRestConfig.


buildApplication creates a KsqlEngine.


buildApplication builds the name of the command topic (to create a CommandStore and a CommandRunner).

buildApplication creates a CommandStore.

Step x. InteractiveStatementExecutor

buildApplication creates an InteractiveStatementExecutor.


buildApplication creates a QueryExecutor and a StreamedQueryResource.

Step x. CommandRunner

buildApplication creates a CommandRunner (with the InteractiveStatementExecutor).


Step x. KsqlRestApplication

In the end, buildApplication creates a KsqlRestApplication.


void startAsync()

startAsync is part of the Executable abstraction.

startAsync prints out the following DEBUG message to the logs:

Starting the ksqlDB API server

startAsync creates a ServerMetadataResource.

startAsync creates a StatementParser (with the KsqlEngine).


startAsync creates a Server and starts it.


In the end, startAsync prints out the following INFO message to the logs followed by displayWelcomeMessage.

ksqlDB API server listening on [comma-separated listeners]


void displayWelcomeMessage()


In the end, displayWelcomeMessage prints out the following to the standard output:

Server [version] listening on [comma-separated listeners]

To access the KSQL CLI, run:
ksql [listener]


void startKsql(
  KsqlConfig ksqlConfigWithPort)

startKsql cleanupOldState before initialization (with the given KsqlConfig).


void initialize(
  KsqlConfig configWithApplicationServer)

initialize executes the rocksDBConfigSetterHandler with the ksqlConfigNoPort.

initialize registerCommandTopic.

initialize requests the CommandStore to start.

initialize maybeCreateProcessingLogTopic with the following:

initialize requests the CommandRunner to processPriorCommands (with a new PersistentQueryCleanupImpl).

initialize requests the CommandRunner to start processing commands.

initialize maybeCreateProcessingLogStream.

initialize starts the heartbeatAgent and the lagReportingAgent agents (if specified).

In the end, initialize changes the ServerState to READY.


void registerCommandTopic()

registerCommandTopic requests the CommandStore for the name of the command topic.

registerCommandTopic makes sure that the internal command topic is available in the Kafka cluster and in sync with backup (if configured).

registerCommandTopic creates the command topic if not exists.


void maybeCreateProcessingLogStream(
  ProcessingLogConfig processingLogConfig,
  KsqlConfig ksqlConfig,
  KsqlRestConfig restConfig,
  KsqlResource ksqlResource,
  ServiceContext serviceContext)

maybeCreateProcessingLogStream does nothing and returns immediately when is turned off (false).


Initializing HeartbeatAgent

Optional<HeartbeatAgent> initializeHeartbeatAgent(
  KsqlRestConfig restConfig,
  KsqlEngine ksqlEngine,
  ServiceContext serviceContext,
  Optional<LagReportingAgent> lagReportingAgent)

With ksql.heartbeat.enable enabled, initializeHeartbeatAgent builds a HeartbeatAgent.

Otherwise, initializeHeartbeatAgent does nothing and returns no HeartbeatAgent.


Enable ALL logging level for logger to see what happens inside.

Add the following line to

Refer to Logging.