KsqlRestApplication¶
KsqlRestApplication
is a ksqlDB REST API server.
KsqlRestApplication
uses CommandRunner to process prior commands and then run new commands continuously (off a CommandQueue).
KsqlRestApplication
can be started using ksql-server-start shell script.
Creating Instance¶
KsqlRestApplication
takes the following to be created:
- ServiceContext
- KsqlEngine
- KsqlConfig
- KsqlRestConfig
- CommandRunner
- CommandStore
-
StatusResource
-
StreamedQueryResource
- KsqlResource
- VersionCheckerAgent
-
KsqlSecurityContextProvider
-
KsqlSecurityExtension
-
AuthenticationPlugin
-
ServerState
-
ProcessingLogContext
-
KsqlServerPrecondition
s - KsqlConfigurables
- RocksDB Config Setter Handler (Function of KsqlConfig)
-
HeartbeatAgent
-
LagReportingAgent
- Vert.x
-
DenyListPropertyValidator
-
PullQueryExecutorMetrics
-
ScalablePushQueryMetrics
-
LocalCommands
- QueryExecutor
When created, KsqlRestApplication
prints out the following DEBUG message to the logs:
Creating instance of ksqlDB API server
In the end, KsqlRestApplication
prints out the following DEBUG message to the logs:
ksqlDB API server instance created
KsqlRestApplication
is created using buildApplication utility.
KsqlEngine¶
KsqlRestApplication
is given a KsqlEngine when created.
The KsqlEngine
is active until shutdown.
The KsqlEngine
is used in startAsync to create the REST API endpoints:
- Create WSQueryEndpoint (along with StatementParser)
- Create KsqlServerEndpoints for API server
KsqlResource¶
KsqlRestApplication
is given a KsqlResource when created.
KsqlResource
is used to create a KsqlServerEndpoints upon starting.
KsqlResource
is also used to maybeCreateProcessingLogStream upon initializing.
CommandRunner¶
KsqlRestApplication
is given a CommandRunner when created.
The CommandRunner
is requested to processPriorCommands before starting command execution in initialize. The CommandRunner
is up and running until shutdown (when it is requested to close).
The CommandRunner
is used to create a HealthCheckResource when KsqlRestApplication
is created.
KsqlConfigurables¶
KsqlRestApplication
is given KsqlConfigurables when created.
All the given KsqlConfigurables are also given separately to create the KsqlRestApplication
:
- KsqlResource
- StreamedQueryResource
InteractiveStatementExecutor
(that is part of the StatusResource)
KsqlConfigurable
s are requested to configure (with a KsqlConfig with an application.server property assigned) in startAsync.
Building KsqlRestApplication¶
KsqlRestApplication buildApplication(
KsqlRestConfig restConfig,
MetricCollectors metricCollectors)
KsqlRestApplication buildApplication(
String metricsPrefix,
KsqlRestConfig restConfig,
ServerState serverState,
Function<Supplier<Boolean>, VersionCheckerAgent> versionCheckerFactory,
int maxStatementRetries,
ServiceContext serviceContext,
Supplier<SchemaRegistryClient> schemaRegistryClientFactory,
ConnectClientFactory connectClientFactory,
Vertx vertx,
KsqlClient sharedClient,
DefaultServiceContextFactory defaultServiceContextFactory,
UserServiceContextFactory userServiceContextFactory,
MetricCollectors metricCollectors)
buildApplication
is used when:
KsqlServerMain
is requested for an Executable
Step 1. Vert.x¶
buildApplication
creates a Vert.x subsystem.
Vert.x
Vert.x allows writing reactive applications on the JVM with support for HTTP, TCP, UDP, file system, asynchronous streams. etc.
buildApplication
creates an internal KsqlClient.
buildApplication
creates a KsqlSchemaRegistryClientFactory
and DefaultConnectClientFactory
.
buildApplication
determines the Kafka Cluster ID and reads the ksql.service.id configuration property (from the KsqlConfig).
buildApplication
creates a KsqlRestConfig.
buildApplication
...FIXME
buildApplication
creates a KsqlEngine.
buildApplication
...FIXME
buildApplication
builds the name of the command topic (to create a CommandStore and a CommandRunner).
buildApplication
creates a CommandStore.
Step x. InteractiveStatementExecutor¶
buildApplication
creates an InteractiveStatementExecutor.
buildApplication
...FIXME
buildApplication
creates a QueryExecutor and a StreamedQueryResource
.
Step x. CommandRunner¶
buildApplication
creates a CommandRunner (with the InteractiveStatementExecutor).
buildApplication
...FIXME
Step x. KsqlRestApplication¶
In the end, buildApplication
creates a KsqlRestApplication.
startAsync¶
void startAsync()
startAsync
is part of the Executable abstraction.
startAsync
prints out the following DEBUG message to the logs:
Starting the ksqlDB API server
startAsync
creates a ServerMetadataResource.
startAsync
creates a StatementParser (with the KsqlEngine).
startAsync
...FIXME
startAsync
creates a Server and starts it.
startAsync
...FIXME
In the end, startAsync
prints out the following INFO message to the logs followed by displayWelcomeMessage.
ksqlDB API server listening on [comma-separated listeners]
displayWelcomeMessage¶
void displayWelcomeMessage()
displayWelcomeMessage
...FIXME
In the end, displayWelcomeMessage
prints out the following to the standard output:
Server [version] listening on [comma-separated listeners]
To access the KSQL CLI, run:
ksql [listener]
startKsql¶
void startKsql(
KsqlConfig ksqlConfigWithPort)
startKsql
cleanupOldState before initialization (with the given KsqlConfig).
Initializing¶
void initialize(
KsqlConfig configWithApplicationServer)
initialize
executes the rocksDBConfigSetterHandler with the ksqlConfigNoPort.
initialize
registerCommandTopic.
initialize
requests the CommandStore to start.
initialize
maybeCreateProcessingLogTopic with the following:
initialize
requests the CommandRunner to processPriorCommands (with a new PersistentQueryCleanupImpl
).
initialize
requests the CommandRunner to start processing commands.
initialize
maybeCreateProcessingLogStream.
initialize
starts the heartbeatAgent and the lagReportingAgent agents (if specified).
In the end, initialize
changes the ServerState to READY
.
registerCommandTopic¶
void registerCommandTopic()
registerCommandTopic
requests the CommandStore for the name of the command topic.
registerCommandTopic
makes sure that the internal command topic is available in the Kafka cluster and in sync with backup (if configured).
registerCommandTopic
creates the command topic if not exists.
maybeCreateProcessingLogStream¶
void maybeCreateProcessingLogStream(
ProcessingLogConfig processingLogConfig,
KsqlConfig ksqlConfig,
KsqlRestConfig restConfig,
KsqlResource ksqlResource,
ServiceContext serviceContext)
maybeCreateProcessingLogStream
does nothing and returns immediately when ksql.logging.processing.stream.auto.create is turned off (false
).
maybeCreateProcessingLogStream
...FIXME
Initializing HeartbeatAgent¶
Optional<HeartbeatAgent> initializeHeartbeatAgent(
KsqlRestConfig restConfig,
KsqlEngine ksqlEngine,
ServiceContext serviceContext,
Optional<LagReportingAgent> lagReportingAgent)
With ksql.heartbeat.enable enabled, initializeHeartbeatAgent
builds a HeartbeatAgent.
Otherwise, initializeHeartbeatAgent
does nothing and returns no HeartbeatAgent
.
Logging¶
Enable ALL
logging level for io.confluent.ksql.rest.server.KsqlRestApplication
logger to see what happens inside.
Add the following line to log4j.properties
:
log4j.logger.io.confluent.ksql.rest.server.KsqlRestApplication=ALL
Refer to Logging.