StreamedQueryResource¶
Creating Instance¶
StreamedQueryResource takes the following to be created:
- KsqlExecutionContext
- KsqlRestConfig
- StatementParser
- CommandQueue
-
disconnectCheckInterval - ksql.server.command.response.timeout.ms
-
ActivenessRegistrar -
Errorshandler -
DenyListPropertyValidator - QueryExecutor
StreamedQueryResource is created when:
KsqlRestApplicationis requested to build a KsqlRestApplication
CommandQueue¶
StreamedQueryResource is given a CommandQueue when created.
The CommandQueue is used to httpWaitForCommandSequenceNumber when streamQuery.
StatementParser¶
StreamedQueryResource is given a StatementParser when created.
The StatementParser is used in parseStatement (to parse a KSQL statement).
streamQuery¶
EndpointResponse streamQuery(
KsqlSecurityContext securityContext,
KsqlRequest request,
CompletableFuture<Void> connectionClosedFuture,
Optional<Boolean> isInternalRequest,
MetricsCallbackHolder metricsCallbackHolder,
Context context)
streamQuery requests the ActivenessRegistrar to updateLastRequestTime.
streamQuery parses the given KsqlRequest (into a PreparedStatement).
streamQuery httpWaitForCommandSequenceNumber (with the commandQueue, the given KsqlRequest, and the commandQueueCatchupTimeout).
In the end, streamQuery handles the KSQL statement.
streamQuery is used when:
KsqlServerEndpointsis requested to executeQueryRequest
parseStatement¶
PreparedStatement<?> parseStatement(
KsqlRequest request)
parseStatement requests the StatementParser to parse the ksql statement (from the given KsqlRequest).
handleStatement¶
EndpointResponse handleStatement(
KsqlSecurityContext securityContext,
KsqlRequest request,
PreparedStatement<?> statement,
CompletableFuture<Void> connectionClosedFuture,
Optional<Boolean> isInternalRequest,
MetricsCallbackHolder metricsCallbackHolder,
Context context)
handleStatement handles Query and PrintTopic statements only.
handleStatement requests the KsqlAuthorizationValidator to checkAuthorization (if defined).
handleStatement requests the DenyListPropertyValidator to validateAll the config overrides (from the given KsqlRequest).
Query¶
For a Query statement, handleStatement shouldMigrateToQueryStream or (if migration did not happen) requests the QueryExecutor to handle the statement and then handleQuery.
PrintTopic¶
For a PrintTopic statement, handleStatement handlePrintTopic.
handleQuery¶
EndpointResponse handleQuery(
PreparedStatement<Query> statement,
CompletableFuture<Void> connectionClosedFuture,
QueryMetadataHolder queryMetadataHolder)
handleQuery handles pull and push queries (and uses the given QueryMetadataHolder to determine the type).
For a pull query, handleQuery requests the given QueryMetadataHolder for the PullQueryResult and returns a new PullQueryStreamWriter in response.
For a push query, handleQuery requests the given QueryMetadataHolder for the PushQueryMetadata and returns a new QueryStreamWriter in response.
For other types, handleQuery responds with 400 Bad Request error code and the following message:
Statement type `className' not supported for this resource
shouldMigrateToQueryStream¶
boolean shouldMigrateToQueryStream(
Map<String, Object> overrides)
shouldMigrateToQueryStream takes the value of ksql.endpoint.migrate.query from the given overrides (if defined) or the system KsqlConfig.
shouldMigrateToQueryStream is used when:
StreamedQueryResourceis requested to handle a query statement