Skip to content

KsqlRestApplication

KsqlRestApplication is a ksqlDB REST API server.

KsqlRestApplication uses CommandRunner to process prior commands and then run new commands continuously (off a CommandQueue).

KsqlRestApplication can be started using ksql-server-start shell script.

Creating Instance

KsqlRestApplication takes the following to be created:


When created, KsqlRestApplication prints out the following DEBUG message to the logs:

Creating instance of ksqlDB API server

In the end, KsqlRestApplication prints out the following DEBUG message to the logs:

ksqlDB API server instance created

KsqlRestApplication is created using buildApplication utility.

KsqlEngine

KsqlRestApplication is given a KsqlEngine when created.

The KsqlEngine is active until shutdown.

The KsqlEngine is used in startAsync to create the REST API endpoints:

KsqlResource

KsqlRestApplication is given a KsqlResource when created.

KsqlResource is used to create a KsqlServerEndpoints upon starting.

KsqlResource is also used to maybeCreateProcessingLogStream upon initializing.

CommandRunner

KsqlRestApplication is given a CommandRunner when created.

The CommandRunner is requested to processPriorCommands before starting command execution in initialize. The CommandRunner is up and running until shutdown (when it is requested to close).

The CommandRunner is used to create a HealthCheckResource when KsqlRestApplication is created.

KsqlConfigurables

KsqlRestApplication is given KsqlConfigurables when created.

All the given KsqlConfigurables are also given separately to create the KsqlRestApplication:

KsqlConfigurables are requested to configure (with a KsqlConfig with an application.server property assigned) in startAsync.

Building KsqlRestApplication

KsqlRestApplication buildApplication(
  KsqlRestConfig restConfig,
  MetricCollectors metricCollectors)
KsqlRestApplication buildApplication(
  String metricsPrefix,
  KsqlRestConfig restConfig,
  ServerState serverState,
  Function<Supplier<Boolean>, VersionCheckerAgent> versionCheckerFactory,
  int maxStatementRetries,
  ServiceContext serviceContext,
  Supplier<SchemaRegistryClient> schemaRegistryClientFactory,
  ConnectClientFactory connectClientFactory,
  Vertx vertx,
  KsqlClient sharedClient,
  DefaultServiceContextFactory defaultServiceContextFactory,
  UserServiceContextFactory userServiceContextFactory,
  MetricCollectors metricCollectors)

buildApplication is used when:

Step 1. Vert.x

buildApplication creates a Vert.x subsystem.

Vert.x

Vert.x allows writing reactive applications on the JVM with support for HTTP, TCP, UDP, file system, asynchronous streams. etc.

buildApplication creates an internal KsqlClient.

buildApplication creates a KsqlSchemaRegistryClientFactory and DefaultConnectClientFactory.

buildApplication determines the Kafka Cluster ID and reads the ksql.service.id configuration property (from the KsqlConfig).

buildApplication creates a KsqlRestConfig.

buildApplication...FIXME

buildApplication creates a KsqlEngine.

buildApplication...FIXME

buildApplication builds the name of the command topic (to create a CommandStore and a CommandRunner).

buildApplication creates a CommandStore.

Step x. InteractiveStatementExecutor

buildApplication creates an InteractiveStatementExecutor.

buildApplication...FIXME

buildApplication creates a QueryExecutor and a StreamedQueryResource.

Step x. CommandRunner

buildApplication creates a CommandRunner (with the InteractiveStatementExecutor).

buildApplication...FIXME

Step x. KsqlRestApplication

In the end, buildApplication creates a KsqlRestApplication.

startAsync

void startAsync()

startAsync is part of the Executable abstraction.


startAsync prints out the following DEBUG message to the logs:

Starting the ksqlDB API server

startAsync creates a ServerMetadataResource.

startAsync creates a StatementParser (with the KsqlEngine).

startAsync...FIXME

startAsync creates a Server and starts it.

startAsync...FIXME

In the end, startAsync prints out the following INFO message to the logs followed by displayWelcomeMessage.

ksqlDB API server listening on [comma-separated listeners]

displayWelcomeMessage

void displayWelcomeMessage()

displayWelcomeMessage...FIXME

In the end, displayWelcomeMessage prints out the following to the standard output:

Server [version] listening on [comma-separated listeners]

To access the KSQL CLI, run:
ksql [listener]

startKsql

void startKsql(
  KsqlConfig ksqlConfigWithPort)

startKsql cleanupOldState before initialization (with the given KsqlConfig).

Initializing

void initialize(
  KsqlConfig configWithApplicationServer)

initialize executes the rocksDBConfigSetterHandler with the ksqlConfigNoPort.

initialize registerCommandTopic.

initialize requests the CommandStore to start.

initialize maybeCreateProcessingLogTopic with the following:

initialize requests the CommandRunner to processPriorCommands (with a new PersistentQueryCleanupImpl).

initialize requests the CommandRunner to start processing commands.

initialize maybeCreateProcessingLogStream.

initialize starts the heartbeatAgent and the lagReportingAgent agents (if specified).

In the end, initialize changes the ServerState to READY.

registerCommandTopic

void registerCommandTopic()

registerCommandTopic requests the CommandStore for the name of the command topic.

registerCommandTopic makes sure that the internal command topic is available in the Kafka cluster and in sync with backup (if configured).

registerCommandTopic creates the command topic if not exists.

maybeCreateProcessingLogStream

void maybeCreateProcessingLogStream(
  ProcessingLogConfig processingLogConfig,
  KsqlConfig ksqlConfig,
  KsqlRestConfig restConfig,
  KsqlResource ksqlResource,
  ServiceContext serviceContext)

maybeCreateProcessingLogStream does nothing and returns immediately when ksql.logging.processing.stream.auto.create is turned off (false).

maybeCreateProcessingLogStream...FIXME

Initializing HeartbeatAgent

Optional<HeartbeatAgent> initializeHeartbeatAgent(
  KsqlRestConfig restConfig,
  KsqlEngine ksqlEngine,
  ServiceContext serviceContext,
  Optional<LagReportingAgent> lagReportingAgent)

With ksql.heartbeat.enable enabled, initializeHeartbeatAgent builds a HeartbeatAgent.

Otherwise, initializeHeartbeatAgent does nothing and returns no HeartbeatAgent.

Logging

Enable ALL logging level for io.confluent.ksql.rest.server.KsqlRestApplication logger to see what happens inside.

Add the following line to log4j.properties:

log4j.logger.io.confluent.ksql.rest.server.KsqlRestApplication=ALL

Refer to Logging.