Skip to content

QueryBuilder

Creating Instance

QueryBuilder takes the following to be created:

QueryBuilder is created when:

real Flag

QueryBuilder is given real flag when created.

The flag is used to indicate whether executed in a non-sandboxed (real) environment or not (sandbox) when:

KafkaStreamsBuilder

QueryBuilder can be given a KafkaStreamsBuilder when created. Unless given, QueryBuilder creates a KafkaStreamsBuilderImpl with the KafkaClientSupplier from the given ServiceContext.

The KafkaStreamsBuilder is used when:

Building Transient Query

TransientQueryMetadata buildTransientQuery(
  String statementText,
  QueryId queryId,
  Set<SourceName> sources,
  ExecutionStep<?> physicalPlan,
  String planSummary,
  LogicalSchema schema,
  OptionalInt limit,
  Optional<WindowInfo> windowInfo,
  boolean excludeTombstones,
  QueryMetadata.Listener listener,
  StreamsBuilder streamsBuilder,
  Optional<ImmutableMap<TopicPartition, Long>> endOffsets)

buildTransientQuery requests the SessionConfig for the KsqlConfig (with overrides applied).

Kafka Streams

This is the moment where ksqlDB relies on Kafka Streams (when building a QueryImplementation)

buildTransientQuery builds the following:

buildTransientQuery requests the given StreamsBuilder (Kafka Streams) to build a Topology (Kafka Streams).

buildTransientQuery determines a ResultType (based on the QueryImplementation and the optional windowInfo):

  • WINDOWED_TABLE for a KTableHolder with the windowInfo specified
  • TABLE for a KTableHolder with no windowInfo specified
  • STREAM for all other cases

In the end, buildTransientQuery creates a TransientQueryMetadata.


buildTransientQuery is used when:

Building RuntimeBuildContext

RuntimeBuildContext buildContext(
  String applicationId,
  QueryId queryId,
  StreamsBuilder streamsBuilder)

buildContext creates a RuntimeBuildContext.


buildContext is used when:

Building Query Implementation

Object buildQueryImplementation(
  ExecutionStep<?> physicalPlan,
  RuntimeBuildContext runtimeBuildContext)

Kafka Streams

This is the moment in a ksqlDB query's life cycle when the physical plan is converted into a Kafka Streams application.

buildQueryImplementation creates a KSPlanBuilder with the given RuntimeBuildContext.

In the end, buildQueryImplementation requests the given ExecutionStep physical plan to build a Kafka Streams application (with the KSPlanBuilder).


buildQueryImplementation is used when QueryBuilder is requested to build the following:

Building Persistent Query (Dedicated Runtime)

PersistentQueryMetadata buildPersistentQueryInDedicatedRuntime(
  KsqlConfig ksqlConfig,
  KsqlConstants.PersistentQueryType persistentQueryType,
  String statementText,
  QueryId queryId,
  Optional<DataSource> sinkDataSource,
  Set<DataSource> sources,
  ExecutionStep<?> physicalPlan,
  String planSummary,
  QueryMetadata.Listener listener,
  Supplier<List<PersistentQueryMetadata>> allPersistentQueries,
  StreamsBuilder streamsBuilder,
  MetricCollectors metricCollectors)

buildPersistentQueryInDedicatedRuntime builds an application ID (with the persistent flag enabled).

buildPersistentQueryInDedicatedRuntime builds streams properties.

buildPersistentQueryInDedicatedRuntime builds a physical schema.

buildPersistentQueryInDedicatedRuntime buildContext (with the application ID, the given QueryId and StreamsBuilder).

Kafka Streams

This is the moment where ksqlDB relies on Kafka Streams (when building a QueryImplementation)

buildPersistentQueryInDedicatedRuntime buildQueryImplementation (with the given ExecutionStep physical plan and the RuntimeBuildContext).

buildPersistentQueryInDedicatedRuntime requests the given StreamsBuilder (Kafka Streams) to build a topology (with the streams properties).

In the end, buildPersistentQueryInDedicatedRuntime creates a PersistentQueryMetadataImpl.


buildPersistentQueryInDedicatedRuntime is used when:

Building Persistent Query (Shared Runtime)

PersistentQueryMetadata buildPersistentQueryInSharedRuntime(
  KsqlConfig ksqlConfig,
  KsqlConstants.PersistentQueryType persistentQueryType,
  String statementText,
  QueryId queryId,
  Optional<DataSource> sinkDataSource,
  Set<DataSource> sources,
  ExecutionStep<?> physicalPlan,
  String planSummary,
  QueryMetadata.Listener listener,
  Supplier<List<PersistentQueryMetadata>> allPersistentQueries,
  String applicationId,
  MetricCollectors metricCollectors)

buildPersistentQueryInSharedRuntime looks up the (shared) KafkaStreams instance for the given applicationId.

buildPersistentQueryInSharedRuntime requests the (shared) KafkaStreams instance for the KafkaStreams instance to create a NamedTopologyBuilder (Kafka Streams) for the given queryId.

buildPersistentQueryInSharedRuntime builds a query implementation for the given physical plan.

buildPersistentQueryInSharedRuntime requests the NamedTopologyBuilder to build a NamedTopology (Kafka Streams).

buildPersistentQueryInSharedRuntime...FIXME


buildPersistentQueryInSharedRuntime is used when:

Finding NamedTopology

NamedTopology getNamedTopology(
  SharedKafkaStreamsRuntime sharedRuntime,
  QueryId queryId,
  String applicationId,
  Map<String, Object>  queryOverrides,
  ExecutionStep<?> physicalPlan)

getNamedTopology...FIXME

getKafkaStreamsInstance

SharedKafkaStreamsRuntime getKafkaStreamsInstance(
  Set<SourceName> sources,
  QueryId queryID,
  MetricCollectors metricCollectors)

getKafkaStreamsInstance...FIXME

Building Streams Properties

Map<String, Object> buildStreamsProperties(
  String applicationId,
  Optional<QueryId> queryId,
  MetricCollectors metricCollectors,
  KsqlConfig config,
  ProcessingLogContext processingLogContext)

buildStreamsProperties requests the given KsqlConfig for getKsqlStreamConfigProps for the given applicationId.

buildStreamsProperties sets StreamsConfig.APPLICATION_ID_CONFIG to the given applicationId.

buildStreamsProperties requests the given ProcessingLogContext for ProcessingLoggerFactory to build a ProcessingLogger.

buildStreamsProperties sets ProductionExceptionHandlerUtil.KSQL_PRODUCTION_ERROR_LOGGER to the ProcessingLogger.

buildStreamsProperties...FIXME


buildStreamsProperties is used when:

getUncaughtExceptionProcessingLogger

ProcessingLogger getUncaughtExceptionProcessingLogger(
  QueryId queryId)

getUncaughtExceptionProcessingLogger...FIXME


getUncaughtExceptionProcessingLogger is used when: