Skip to content

ServerVerticle

ServerVerticle is a Verticle (Vert.x).

A verticle is a piece of code that can be deployed by Vert.x (...) to provide an actor-like deployment and concurrency model, out of the box.

Creating Instance

ServerVerticle takes the following to be created:

  • Endpoints
  • HttpServerOptions (Vert.x)
  • Server
  • isInternalListener flag
  • PullQueryExecutorMetrics
  • LoggingRateLimiter

ServerVerticle is created when:

  • Server is requested to start

Starting Verticle

void start(
  Promise<Void> startPromise)

start is part of the Verticle (Vert.x) abstraction.


start creates a ConnectionQueryManager.

start creates an HttpServer (Vert.x) (with the HttpServerOptions) and registers the request handlers.

URIs

Router setupRouter()

setupRouter registers the query handlers.

URI HTTP Method Handler
/ GET ServerVerticle::handleInfoRedirect
/close-query POST CloseQueryHandler
/clusterStatus GET handleClusterStatusRequest
/healthcheck GET handleHealthcheckRequest
/heartbeat POST handleHeartbeatRequest
/info GET handleInfoRequest
/inserts-stream POST InsertsStreamHandler
/is_valid_property/:property GET handleIsValidPropertyRequest
/ksql POST handleKsqlRequest
/ksql/terminate POST handleTerminateRequest
/lag POST handleLagReportRequest
/query POST handleQueryRequest or QueryStreamHandler based on ksql.endpoint.migrate.query
/query-stream POST QueryStreamHandler
/status/:type/:entity/:action GET handleStatusRequest
/status GET handleAllStatusesRequest
/v1/metadata GET handleServerMetadataRequest
/v1/metadata/id GET handleServerMetadataClusterIdRequest
/ws/query GET handleWebsocket

handleAllStatusesRequest

void handleAllStatusesRequest(
  RoutingContext routingContext)

handleAllStatusesRequest requests the Endpoints to executeAllStatuses.

$ http http://localhost:8088/status
HTTP/1.1 200 OK
content-length: 69
content-type: application/json

{
    "commandStatuses": {
        "stream/`KSQL_PROCESSING_LOG`/create": "SUCCESS"
    }
}

handleClusterStatusRequest

void handleClusterStatusRequest(
  RoutingContext routingContext)

handleClusterStatusRequest requests the Endpoints to executeClusterStatus.

$ http -b http://localhost:8088/clusterStatus

handleInfoRequest

void handleInfoRequest(
  RoutingContext routingContext)

handleInfoRequest requests the Endpoints to executeInfo.

$ http http://localhost:8088/info
HTTP/1.1 200 OK
content-length: 133
content-type: application/json

{
    "KsqlServerInfo": {
        "kafkaClusterId": "kI5f7xZWQaynAgoptiVXJw",
        "ksqlServiceId": "default_",
        "serverStatus": "RUNNING",
        "version": "0.27.2"
    }
}

handleKsqlRequest

void handleKsqlRequest(
  RoutingContext routingContext)

handleKsqlRequest requests the Endpoints to execute the KsqlRequest.

$ http http://localhost:8088/ksql ksql="LIST STREAMS;"
HTTP/1.1 200 OK
content-length: 224
content-type: application/json

[
    {
        "@type": "streams",
        "statementText": "LIST STREAMS;",
        "streams": [
            {
                "isWindowed": false,
                "keyFormat": "KAFKA",
                "name": "KSQL_PROCESSING_LOG",
                "topic": "default_ksql_processing_log",
                "type": "STREAM",
                "valueFormat": "JSON"
            }
        ],
        "warnings": []
    }
]

handleQueryRequest

void handleQueryRequest(
  RoutingContext routingContext)

handleQueryRequest handleOldApiRequest and requests the Endpoints to executeQueryRequest.


handleQueryRequest is used to handle POST /query HTTP requests.

handleWebsocket

void handleWebsocket(
  RoutingContext routingContext)

handleWebsocket requests the Endpoints to executeWebsocketStream.


handleWebsocket is used to handle GET /ws/query HTTP requests.