Skip to content

QueryExecutor

Creating Instance

QueryExecutor takes the following to be created:

QueryExecutor is created when:

HARouting

QueryExecutor is given a HARouting when created.

The HARouting is used to handleTablePullQuery (for the KsqlExecutionContext to executeTablePullQuery).

Handling Statement

QueryMetadataHolder handleStatement(
  ServiceContext serviceContext,
  Map<String, Object> configOverrides,
  Map<String, Object> requestProperties,
  PreparedStatement<?> statement,
  Optional<Boolean> isInternalRequest,
  MetricsCallbackHolder metricsCallbackHolder,
  Context context,
  boolean excludeTombstones)

For the given PreparedStatement for a Query, handleStatement handles it. Otherwise, handleStatement returns an empty QueryMetadataHolder.


handleStatement is used when:

Handling Query Statement

QueryMetadataHolder handleQuery(
  ServiceContext serviceContext,
  PreparedStatement<Query> statement,
  Optional<Boolean> isInternalRequest,
  MetricsCallbackHolder metricsCallbackHolder,
  Map<String, Object> configOverrides,
  Map<String, Object> requestProperties,
  Context context,
  boolean excludeTombstones)

Pull Query

For a pull query, handleQuery requests the KsqlEngine to analyze the query statement with no sink (that gives an ImmutableAnalysis).

With ksql.pull.queries.enable disabled, handleQuery throws a KsqlStatementException:

Pull queries are disabled.

handleQuery determines a ConsistencyOffsetVector based on ksql.query.pull.consistency.token.enabled in the KsqlConfig and the given requestProperties.

handleQuery determines the DataSourceType of the FROM clause (of the DataSource of the ImmutableAnalysis of this pull query).

For a KTABLE data source, handleQuery handle the table pull query.

For a KSTREAM data source, handleQuery handle the stream pull query.

Scalable Push Query

For a scalable push query, handleQuery requests the KsqlEngine to analyze the query statement with no sink (that gives an ImmutableAnalysis).

handleQuery prints out the following INFO message to the logs:

Scalable push query created

handleQuery handleScalablePushQuery.

Transient Query

Otherwise, handleQuery prints out the following INFO message to the logs and handlePushQuery.

Transient query created

handlePushQuery

QueryMetadataHolder handlePushQuery(
  ServiceContext serviceContext,
  PreparedStatement<Query> statement,
  Map<String, Object> streamsProperties,
  boolean excludeTombstones)

handlePushQuery...FIXME

handleScalablePushQuery

QueryMetadataHolder handleScalablePushQuery(
  ImmutableAnalysis analysis,
  ServiceContext serviceContext,
  PreparedStatement<Query> statement,
  Map<String, Object> configOverrides,
  Map<String, Object> requestProperties,
  Context context,
  SlidingWindowRateLimiter scalablePushBandRateLimiter,
  AtomicReference<ScalablePushQueryMetadata> resultForMetrics)

handleScalablePushQuery...FIXME

handleScalablePushQuery requests the KsqlEngine to execute a scalable push query.

handleScalablePushQuery...FIXME

In the end, handleScalablePushQuery prints out the following INFO message to the logs:

Streaming scalable push query

handleScalablePushQuery is used when:

Handling Stream Pull Query

QueryMetadataHolder handleStreamPullQuery(
  ImmutableAnalysis analysis,
  ServiceContext serviceContext,
  ConfiguredStatement<Query> configured,
  AtomicReference<StreamPullQueryMetadata> resultForMetrics,
  AtomicReference<Decrementer> refDecrementer)

In summary, handleStreamPullQuery requests the KsqlExecutionContext for a stream pull query (that gives a StreamPullQueryMetadata to be returned inside a QueryMetadataHolder).


handleStreamPullQuery requests the RateLimiter to checkLimit.

handleStreamPullQuery requests the pullBandRateLimiter to allow a PULL query.

handleStreamPullQuery requests the KsqlExecutionContext for a stream pull query (that gives a StreamPullQueryMetadata).

handleStreamPullQuery requests the LocalCommands (if defined) to write the TransientQueryMetadata.

In the end, handleStreamPullQuery gives a QueryMetadataHolder with the StreamPullQueryMetadata.


handleStreamPullQuery is used to handle a stream pull query.

handleTablePullQuery

QueryMetadataHolder handleTablePullQuery(
  ImmutableAnalysis analysis,
  ServiceContext serviceContext,
  ConfiguredStatement<Query> configured,
  Map<String, Object> requestProperties,
  Optional<Boolean> isInternalRequest,
  SlidingWindowRateLimiter pullBandRateLimiter,
  AtomicReference<PullQueryResult> resultForMetrics,
  Optional<ConsistencyOffsetVector> consistencyOffsetVector)

handleTablePullQuery creates a PullQueryConfigRoutingOptions and a PullQueryConfigPlannerOptions.

In the end, handleTablePullQuery requests the KsqlEngine to executeTablePullQuery.