Skip to content

QueryExecutor

Creating Instance

QueryExecutor takes the following to be created:

  • KsqlEngine
  • KsqlRestConfig
  • KsqlConfig
  • PullQueryExecutorMetrics
  • ScalablePushQueryMetrics
  • RateLimiter
  • ConcurrencyLimiter
  • SlidingWindowRateLimiter
  • SlidingWindowRateLimiter
  • HARouting
  • PushRouting
  • LocalCommands

QueryExecutor is created when:

Handling Statement

QueryMetadataHolder handleStatement(
  ServiceContext serviceContext,
  Map<String, Object> configOverrides,
  Map<String, Object> requestProperties,
  PreparedStatement<?> statement,
  Optional<Boolean> isInternalRequest,
  MetricsCallbackHolder metricsCallbackHolder,
  Context context,
  boolean excludeTombstones)

For a Query statement, handleStatement handles it. Otherwise, handleStatement returns an empty QueryMetadataHolder.

handleStatement is used when:

  • QueryEndpoint is requested to createQueryPublisher
  • StreamedQueryResource is requested to handleStatement
  • WSQueryEndpoint is requested to handleStatement

Handling Query

QueryMetadataHolder handleQuery(
  ServiceContext serviceContext,
  PreparedStatement<Query> statement,
  Optional<Boolean> isInternalRequest,
  MetricsCallbackHolder metricsCallbackHolder,
  Map<String, Object> configOverrides,
  Map<String, Object> requestProperties,
  Context context,
  boolean excludeTombstones)

handleQuery is used when:

Pull Queries

For a pull query, handleQuery requests the KsqlEngine to analyzeQueryWithNoOutputTopic (that gives an ImmutableAnalysis).

With ksql.pull.queries.enable disabled, handleQuery throws a KsqlStatementException:

Pull queries are disabled.

handleQuery determines a ConsistencyOffsetVector based on ksql.query.pull.consistency.token.enabled in the KsqlConfig and the given requestProperties.

For a KTABLE data source (FROM clause), handleQuery handleTablePullQuery.

For a KSTREAM data source (FROM clause), handleQuery handleStreamPullQuery.

Scalable Push Queries

For a scalable push query, handleQuery requests the KsqlEngine to analyzeQueryWithNoOutputTopic (that gives an ImmutableAnalysis).

handleQuery prints out the following INFO message to the logs:

Scalable push query created

handleQuery handleScalablePushQuery.

Transient Queries

Otherwise, handleQuery prints out the following INFO message to the logs and handlePushQuery.

Transient query created

handlePushQuery

QueryMetadataHolder handlePushQuery(
  ServiceContext serviceContext,
  PreparedStatement<Query> statement,
  Map<String, Object> streamsProperties,
  boolean excludeTombstones)

handlePushQuery...FIXME

Executing Scalable Push Query (handleScalablePushQuery)

QueryMetadataHolder handleScalablePushQuery(
  ImmutableAnalysis analysis,
  ServiceContext serviceContext,
  PreparedStatement<Query> statement,
  Map<String, Object> configOverrides,
  Map<String, Object> requestProperties,
  Context context,
  SlidingWindowRateLimiter scalablePushBandRateLimiter,
  AtomicReference<ScalablePushQueryMetadata> resultForMetrics)

handleScalablePushQuery requests the KsqlEngine to execute a scalable push query.

In the end, handleScalablePushQuery prints out the following INFO message to the logs:

Streaming scalable push query

handleScalablePushQuery is used when:

handleStreamPullQuery

QueryMetadataHolder handleStreamPullQuery(
  ImmutableAnalysis analysis,
  ServiceContext serviceContext,
  ConfiguredStatement<Query> configured,
  AtomicReference<StreamPullQueryMetadata> resultForMetrics,
  AtomicReference<Decrementer> refDecrementer)

handleStreamPullQuery...FIXME

handleTablePullQuery

QueryMetadataHolder handleTablePullQuery(
  ImmutableAnalysis analysis,
  ServiceContext serviceContext,
  ConfiguredStatement<Query> configured,
  Map<String, Object> requestProperties,
  Optional<Boolean> isInternalRequest,
  SlidingWindowRateLimiter pullBandRateLimiter,
  AtomicReference<PullQueryResult> resultForMetrics,
  Optional<ConsistencyOffsetVector> consistencyOffsetVector)

handleTablePullQuery creates a PullQueryConfigRoutingOptions and a PullQueryConfigPlannerOptions.

In the end, handleTablePullQuery requests the KsqlEngine to executeTablePullQuery.

Back to top