Skip to content

QueryBuilder

Creating Instance

QueryBuilder takes the following to be created:

QueryBuilder is created when:

KafkaStreamsBuilder

QueryBuilder can be given a KafkaStreamsBuilder when created. Unless given, QueryBuilder creates a KafkaStreamsBuilderImpl with the KafkaClientSupplier from the given ServiceContext.

The KafkaStreamsBuilder is used when:

Building Transient Query

TransientQueryMetadata buildTransientQuery(
  String statementText,
  QueryId queryId,
  Set<SourceName> sources,
  ExecutionStep<?> physicalPlan,
  String planSummary,
  LogicalSchema schema,
  OptionalInt limit,
  Optional<WindowInfo> windowInfo,
  boolean excludeTombstones,
  QueryMetadata.Listener listener,
  StreamsBuilder streamsBuilder,
  Optional<ImmutableMap<TopicPartition, Long>> endOffsets)

Kafka Streams

buildTransientQuery is given a new StreamsBuilder (Kafka Streams) that is used to build a RuntimeBuildContext and then a Topology (Kafka Streams).

That means that the Kafka Streams topology can only be created while building the RuntimeBuildContext.

buildTransientQuery requests the SessionConfig for the KsqlConfig (with overrides applied).

buildTransientQuery builds the following:

buildTransientQuery requests the given StreamsBuilder (Kafka Streams) to build a Topology (Kafka Streams).

buildTransientQuery determines a ResultType (based on the QueryImplementation and the optional windowInfo):

  • WINDOWED_TABLE for a KTableHolder with the windowInfo specified
  • TABLE for a KTableHolder with no windowInfo specified
  • STREAM for all other cases

In the end, buildTransientQuery creates a TransientQueryMetadata.

buildTransientQuery is used when:

Building RuntimeBuildContext

RuntimeBuildContext buildContext(
  String applicationId,
  QueryId queryId,
  StreamsBuilder streamsBuilder)

buildContext creates a RuntimeBuildContext.

buildContext is used when:

Building Query Implementation

Object buildQueryImplementation(
  ExecutionStep<?> physicalPlan,
  RuntimeBuildContext runtimeBuildContext)

Kafka Streams

This is the moment in a ksqlDB query's life cycle when the physical plan is converted into a Kafka Streams application.

buildQueryImplementation creates a KSPlanBuilder with the given RuntimeBuildContext.

In the end, buildQueryImplementation requests the given physical plan to build a Kafka Streams application (with the KSPlanBuilder).

buildQueryImplementation is used when QueryBuilder is requested to build the following:

Building Persistent Query (Shared Runtime)

PersistentQueryMetadata buildPersistentQueryInSharedRuntime(
  KsqlConfig ksqlConfig,
  KsqlConstants.PersistentQueryType persistentQueryType,
  String statementText,
  QueryId queryId,
  Optional<DataSource> sinkDataSource,
  Set<DataSource> sources,
  ExecutionStep<?> physicalPlan,
  String planSummary,
  QueryMetadata.Listener listener,
  Supplier<List<PersistentQueryMetadata>> allPersistentQueries,
  String applicationId,
  MetricCollectors metricCollectors)

buildPersistentQueryInSharedRuntime looks up the (shared) KafkaStreams instance for the given applicationId.

buildPersistentQueryInSharedRuntime requests the (shared) KafkaStreams instance for the KafkaStreams instance to create a NamedTopologyBuilder (Kafka Streams) for the given queryId.

buildPersistentQueryInSharedRuntime builds a query implementation for the given physical plan.

buildPersistentQueryInSharedRuntime requests the NamedTopologyBuilder to build a NamedTopology (Kafka Streams).

buildPersistentQueryInSharedRuntime...FIXME

buildPersistentQueryInSharedRuntime is used when:

Finding NamedTopology

NamedTopology getNamedTopology(
  SharedKafkaStreamsRuntime sharedRuntime,
  QueryId queryId,
  String applicationId,
  Map<String, Object>  queryOverrides,
  ExecutionStep<?> physicalPlan)

getNamedTopology...FIXME

getKafkaStreamsInstance

SharedKafkaStreamsRuntime getKafkaStreamsInstance(
  Set<SourceName> sources,
  QueryId queryID,
  MetricCollectors metricCollectors)

getKafkaStreamsInstance...FIXME

Building Persistent Query (Dedicated Runtime)

PersistentQueryMetadata buildPersistentQueryInDedicatedRuntime(
  KsqlConfig ksqlConfig,
  KsqlConstants.PersistentQueryType persistentQueryType,
  String statementText,
  QueryId queryId,
  Optional<DataSource> sinkDataSource,
  Set<DataSource> sources,
  ExecutionStep<?> physicalPlan,
  String planSummary,
  QueryMetadata.Listener listener,
  Supplier<List<PersistentQueryMetadata>> allPersistentQueries,
  StreamsBuilder streamsBuilder,
  MetricCollectors metricCollectors)

buildPersistentQueryInDedicatedRuntime builds an application ID (with the persistent flag enabled).

buildPersistentQueryInDedicatedRuntime buildStreamsProperties.

buildPersistentQueryInDedicatedRuntime...FIXME

buildPersistentQueryInDedicatedRuntime is used when:

Back to top