KsqlRestApplication¶
KsqlRestApplication is a ksqlDB REST API server.
KsqlRestApplication uses CommandRunner to process prior commands and then run new commands continuously (off a CommandQueue).
KsqlRestApplication can be started using ksql-server-start shell script.
Creating Instance¶
KsqlRestApplication takes the following to be created:
- ServiceContext
- KsqlEngine
- KsqlConfig
- KsqlRestConfig
- CommandRunner
- CommandStore
-
StatusResource -
StreamedQueryResource - KsqlResource
- VersionCheckerAgent
-
KsqlSecurityContextProvider -
KsqlSecurityExtension -
AuthenticationPlugin -
ServerState -
ProcessingLogContext -
KsqlServerPreconditions - KsqlConfigurables
- RocksDB Config Setter Handler (Function of KsqlConfig)
-
HeartbeatAgent -
LagReportingAgent - Vert.x
-
DenyListPropertyValidator -
PullQueryExecutorMetrics -
ScalablePushQueryMetrics -
LocalCommands - QueryExecutor
When created, KsqlRestApplication prints out the following DEBUG message to the logs:
Creating instance of ksqlDB API server
In the end, KsqlRestApplication prints out the following DEBUG message to the logs:
ksqlDB API server instance created
KsqlRestApplication is created using buildApplication utility.
KsqlEngine¶
KsqlRestApplication is given a KsqlEngine when created.
The KsqlEngine is active until shutdown.
The KsqlEngine is used in startAsync to create the REST API endpoints:
- Create WSQueryEndpoint (along with StatementParser)
- Create KsqlServerEndpoints for API server
KsqlResource¶
KsqlRestApplication is given a KsqlResource when created.
KsqlResource is used to create a KsqlServerEndpoints upon starting.
KsqlResource is also used to maybeCreateProcessingLogStream upon initializing.
CommandRunner¶
KsqlRestApplication is given a CommandRunner when created.
The CommandRunner is requested to processPriorCommands before starting command execution in initialize. The CommandRunner is up and running until shutdown (when it is requested to close).
The CommandRunner is used to create a HealthCheckResource when KsqlRestApplication is created.
KsqlConfigurables¶
KsqlRestApplication is given KsqlConfigurables when created.
All the given KsqlConfigurables are also given separately to create the KsqlRestApplication:
- KsqlResource
- StreamedQueryResource
InteractiveStatementExecutor(that is part of the StatusResource)
KsqlConfigurables are requested to configure (with a KsqlConfig with an application.server property assigned) in startAsync.
Building KsqlRestApplication¶
KsqlRestApplication buildApplication(
KsqlRestConfig restConfig,
MetricCollectors metricCollectors)
KsqlRestApplication buildApplication(
String metricsPrefix,
KsqlRestConfig restConfig,
ServerState serverState,
Function<Supplier<Boolean>, VersionCheckerAgent> versionCheckerFactory,
int maxStatementRetries,
ServiceContext serviceContext,
Supplier<SchemaRegistryClient> schemaRegistryClientFactory,
ConnectClientFactory connectClientFactory,
Vertx vertx,
KsqlClient sharedClient,
DefaultServiceContextFactory defaultServiceContextFactory,
UserServiceContextFactory userServiceContextFactory,
MetricCollectors metricCollectors)
buildApplication is used when:
KsqlServerMainis requested for an Executable
Step 1. Vert.x¶
buildApplication creates a Vert.x subsystem.
Vert.x
Vert.x allows writing reactive applications on the JVM with support for HTTP, TCP, UDP, file system, asynchronous streams. etc.
buildApplication creates an internal KsqlClient.
buildApplication creates a KsqlSchemaRegistryClientFactory and DefaultConnectClientFactory.
buildApplication determines the Kafka Cluster ID and reads the ksql.service.id configuration property (from the KsqlConfig).
buildApplication creates a KsqlRestConfig.
buildApplication...FIXME
buildApplication creates a KsqlEngine.
buildApplication...FIXME
buildApplication builds the name of the command topic (to create a CommandStore and a CommandRunner).
buildApplication creates a CommandStore.
Step x. InteractiveStatementExecutor¶
buildApplication creates an InteractiveStatementExecutor.
buildApplication...FIXME
buildApplication creates a QueryExecutor and a StreamedQueryResource.
Step x. CommandRunner¶
buildApplication creates a CommandRunner (with the InteractiveStatementExecutor).
buildApplication...FIXME
Step x. KsqlRestApplication¶
In the end, buildApplication creates a KsqlRestApplication.
startAsync¶
void startAsync()
startAsync is part of the Executable abstraction.
startAsync prints out the following DEBUG message to the logs:
Starting the ksqlDB API server
startAsync creates a ServerMetadataResource.
startAsync creates a StatementParser (with the KsqlEngine).
startAsync...FIXME
startAsync creates a Server and starts it.
startAsync...FIXME
In the end, startAsync prints out the following INFO message to the logs followed by displayWelcomeMessage.
ksqlDB API server listening on [comma-separated listeners]
displayWelcomeMessage¶
void displayWelcomeMessage()
displayWelcomeMessage...FIXME
In the end, displayWelcomeMessage prints out the following to the standard output:
Server [version] listening on [comma-separated listeners]
To access the KSQL CLI, run:
ksql [listener]
startKsql¶
void startKsql(
KsqlConfig ksqlConfigWithPort)
startKsql cleanupOldState before initialization (with the given KsqlConfig).
Initializing¶
void initialize(
KsqlConfig configWithApplicationServer)
initialize executes the rocksDBConfigSetterHandler with the ksqlConfigNoPort.
initialize registerCommandTopic.
initialize requests the CommandStore to start.
initialize maybeCreateProcessingLogTopic with the following:
initialize requests the CommandRunner to processPriorCommands (with a new PersistentQueryCleanupImpl).
initialize requests the CommandRunner to start processing commands.
initialize maybeCreateProcessingLogStream.
initialize starts the heartbeatAgent and the lagReportingAgent agents (if specified).
In the end, initialize changes the ServerState to READY.
registerCommandTopic¶
void registerCommandTopic()
registerCommandTopic requests the CommandStore for the name of the command topic.
registerCommandTopic makes sure that the internal command topic is available in the Kafka cluster and in sync with backup (if configured).
registerCommandTopic creates the command topic if not exists.
maybeCreateProcessingLogStream¶
void maybeCreateProcessingLogStream(
ProcessingLogConfig processingLogConfig,
KsqlConfig ksqlConfig,
KsqlRestConfig restConfig,
KsqlResource ksqlResource,
ServiceContext serviceContext)
maybeCreateProcessingLogStream does nothing and returns immediately when ksql.logging.processing.stream.auto.create is turned off (false).
maybeCreateProcessingLogStream...FIXME
Initializing HeartbeatAgent¶
Optional<HeartbeatAgent> initializeHeartbeatAgent(
KsqlRestConfig restConfig,
KsqlEngine ksqlEngine,
ServiceContext serviceContext,
Optional<LagReportingAgent> lagReportingAgent)
With ksql.heartbeat.enable enabled, initializeHeartbeatAgent builds a HeartbeatAgent.
Otherwise, initializeHeartbeatAgent does nothing and returns no HeartbeatAgent.
Logging¶
Enable ALL logging level for io.confluent.ksql.rest.server.KsqlRestApplication logger to see what happens inside.
Add the following line to log4j.properties:
log4j.logger.io.confluent.ksql.rest.server.KsqlRestApplication=ALL
Refer to Logging.