ServerVerticle¶
ServerVerticle
is a Verticle
(Vert.x).
A verticle is a piece of code that can be deployed by Vert.x (...) to provide an actor-like deployment and concurrency model, out of the box.
Creating Instance¶
ServerVerticle
takes the following to be created:
- Endpoints
-
HttpServerOptions
(Vert.x) - Server
-
isInternalListener
flag -
PullQueryExecutorMetrics
-
LoggingRateLimiter
ServerVerticle
is created when:
Server
is requested to start
Starting Verticle¶
void start(
Promise<Void> startPromise)
start
is part of the Verticle
(Vert.x) abstraction.
start
creates a ConnectionQueryManager.
start
creates an HttpServer
(Vert.x) (with the HttpServerOptions) and registers the request handlers.
URIs¶
Router setupRouter()
setupRouter
registers the query handlers.
URI | HTTP Method | Handler |
---|---|---|
/ | GET | ServerVerticle::handleInfoRedirect |
/close-query | POST | CloseQueryHandler |
/clusterStatus | GET | handleClusterStatusRequest |
/healthcheck | GET | handleHealthcheckRequest |
/heartbeat | POST | handleHeartbeatRequest |
/info | GET | handleInfoRequest |
/inserts-stream | POST | InsertsStreamHandler |
/is_valid_property/:property | GET | handleIsValidPropertyRequest |
/ksql | POST | handleKsqlRequest |
/ksql/terminate | POST | handleTerminateRequest |
/lag | POST | handleLagReportRequest |
/query | POST | handleQueryRequest or QueryStreamHandler based on ksql.endpoint.migrate.query |
/query-stream | POST | QueryStreamHandler |
/status/:type/:entity/:action | GET | handleStatusRequest |
/status | GET | handleAllStatusesRequest |
/v1/metadata | GET | handleServerMetadataRequest |
/v1/metadata/id | GET | handleServerMetadataClusterIdRequest |
/ws/query | GET | handleWebsocket |
handleAllStatusesRequest¶
void handleAllStatusesRequest(
RoutingContext routingContext)
handleAllStatusesRequest
requests the Endpoints to executeAllStatuses.
$ http http://localhost:8088/status
HTTP/1.1 200 OK
content-length: 69
content-type: application/json
{
"commandStatuses": {
"stream/`KSQL_PROCESSING_LOG`/create": "SUCCESS"
}
}
handleClusterStatusRequest¶
void handleClusterStatusRequest(
RoutingContext routingContext)
handleClusterStatusRequest
requests the Endpoints to executeClusterStatus.
$ http -b http://localhost:8088/clusterStatus
handleInfoRequest¶
void handleInfoRequest(
RoutingContext routingContext)
handleInfoRequest
requests the Endpoints to executeInfo.
$ http http://localhost:8088/info
HTTP/1.1 200 OK
content-length: 133
content-type: application/json
{
"KsqlServerInfo": {
"kafkaClusterId": "kI5f7xZWQaynAgoptiVXJw",
"ksqlServiceId": "default_",
"serverStatus": "RUNNING",
"version": "0.27.2"
}
}
handleKsqlRequest¶
void handleKsqlRequest(
RoutingContext routingContext)
handleKsqlRequest
requests the Endpoints to execute the KsqlRequest.
$ http http://localhost:8088/ksql ksql="LIST STREAMS;"
HTTP/1.1 200 OK
content-length: 224
content-type: application/json
[
{
"@type": "streams",
"statementText": "LIST STREAMS;",
"streams": [
{
"isWindowed": false,
"keyFormat": "KAFKA",
"name": "KSQL_PROCESSING_LOG",
"topic": "default_ksql_processing_log",
"type": "STREAM",
"valueFormat": "JSON"
}
],
"warnings": []
}
]
handleQueryRequest¶
void handleQueryRequest(
RoutingContext routingContext)
handleQueryRequest
handleOldApiRequest and requests the Endpoints to executeQueryRequest.
handleQueryRequest
is used to handle POST /query HTTP requests.
handleWebsocket¶
void handleWebsocket(
RoutingContext routingContext)
handleWebsocket
requests the Endpoints to executeWebsocketStream.
handleWebsocket
is used to handle GET /ws/query HTTP requests.