QueryBuilder¶
Creating Instance¶
QueryBuilder takes the following to be created:
- SessionConfig
- ProcessingLogContext
- ServiceContext
- FunctionRegistry
- KafkaStreamsBuilder
-  MaterializationProviderBuilderFactory
- SharedKafkaStreamsRuntimes
- real Flag
QueryBuilder is created when:
- QueryRegistryImplis created
real Flag¶
QueryBuilder is given real flag when created.
The flag is used to indicate whether executed in a non-sandboxed (real) environment or not (sandbox) when:
- buildPersistentQueryInSharedRuntime (to decide what PersistentQueryMetadata to use)
- getKafkaStreamsInstance (to decide what SharedKafkaStreamsRuntime to use)
KafkaStreamsBuilder¶
QueryBuilder can be given a KafkaStreamsBuilder when created. Unless given, QueryBuilder creates a KafkaStreamsBuilderImpl with the KafkaClientSupplier from the given ServiceContext.
The KafkaStreamsBuilder is used when:
Building Transient Query¶
TransientQueryMetadata buildTransientQuery(
  String statementText,
  QueryId queryId,
  Set<SourceName> sources,
  ExecutionStep<?> physicalPlan,
  String planSummary,
  LogicalSchema schema,
  OptionalInt limit,
  Optional<WindowInfo> windowInfo,
  boolean excludeTombstones,
  QueryMetadata.Listener listener,
  StreamsBuilder streamsBuilder,
  Optional<ImmutableMap<TopicPartition, Long>> endOffsets)
buildTransientQuery requests the SessionConfig for the KsqlConfig (with overrides applied).
Kafka Streams
This is the moment where ksqlDB relies on Kafka Streams (when building a QueryImplementation)
buildTransientQuery builds the following:
- Application ID (with persistentflag disabled)
- RuntimeBuildContext
- Configuration properties
- QueryImplementation
- TransientQueryQueue
buildTransientQuery requests the given StreamsBuilder (Kafka Streams) to build a Topology (Kafka Streams).
buildTransientQuery determines a ResultType (based on the QueryImplementation and the optional windowInfo):
- WINDOWED_TABLEfor a- KTableHolderwith the- windowInfospecified
- TABLEfor a- KTableHolderwith no- windowInfospecified
- STREAMfor all other cases
In the end, buildTransientQuery creates a TransientQueryMetadata.
buildTransientQuery is used when:
- QueryRegistryImplis requested to createTransientQuery and createStreamPullQuery
Building RuntimeBuildContext¶
RuntimeBuildContext buildContext(
  String applicationId,
  QueryId queryId,
  StreamsBuilder streamsBuilder)
buildContext creates a RuntimeBuildContext.
buildContext is used when:
- QueryBuilderis requested to buildTransientQuery, buildPersistentQueryInDedicatedRuntime, buildPersistentQueryInSharedRuntime and getNamedTopology
Building Query Implementation¶
Object buildQueryImplementation(
  ExecutionStep<?> physicalPlan,
  RuntimeBuildContext runtimeBuildContext)
Kafka Streams
This is the moment in a ksqlDB query's life cycle when the physical plan is converted into a Kafka Streams application.
buildQueryImplementation creates a KSPlanBuilder with the given RuntimeBuildContext.
In the end, buildQueryImplementation requests the given ExecutionStep physical plan to build a Kafka Streams application (with the KSPlanBuilder).
buildQueryImplementation is used when QueryBuilder is requested to build the following:
Building Persistent Query (Dedicated Runtime)¶
PersistentQueryMetadata buildPersistentQueryInDedicatedRuntime(
  KsqlConfig ksqlConfig,
  KsqlConstants.PersistentQueryType persistentQueryType,
  String statementText,
  QueryId queryId,
  Optional<DataSource> sinkDataSource,
  Set<DataSource> sources,
  ExecutionStep<?> physicalPlan,
  String planSummary,
  QueryMetadata.Listener listener,
  Supplier<List<PersistentQueryMetadata>> allPersistentQueries,
  StreamsBuilder streamsBuilder,
  MetricCollectors metricCollectors)
buildPersistentQueryInDedicatedRuntime builds an application ID (with the persistent flag enabled).
buildPersistentQueryInDedicatedRuntime builds streams properties.
buildPersistentQueryInDedicatedRuntime builds a physical schema.
buildPersistentQueryInDedicatedRuntime buildContext (with the application ID, the given QueryId and StreamsBuilder).
Kafka Streams
This is the moment where ksqlDB relies on Kafka Streams (when building a QueryImplementation)
buildPersistentQueryInDedicatedRuntime buildQueryImplementation (with the given ExecutionStep physical plan and the RuntimeBuildContext).
buildPersistentQueryInDedicatedRuntime requests the given StreamsBuilder (Kafka Streams) to build a topology (with the streams properties).
In the end, buildPersistentQueryInDedicatedRuntime creates a PersistentQueryMetadataImpl.
buildPersistentQueryInDedicatedRuntime is used when:
- QueryRegistryImplis requested to createOrReplacePersistentQuery (with no shared runtime ID or ksql.runtime.feature.shared.enabled disabled)
PersistentQueryMetadata buildPersistentQueryInSharedRuntime(
  KsqlConfig ksqlConfig,
  KsqlConstants.PersistentQueryType persistentQueryType,
  String statementText,
  QueryId queryId,
  Optional<DataSource> sinkDataSource,
  Set<DataSource> sources,
  ExecutionStep<?> physicalPlan,
  String planSummary,
  QueryMetadata.Listener listener,
  Supplier<List<PersistentQueryMetadata>> allPersistentQueries,
  String applicationId,
  MetricCollectors metricCollectors)
buildPersistentQueryInSharedRuntime looks up the (shared) KafkaStreams instance for the given applicationId.
buildPersistentQueryInSharedRuntime requests the (shared) KafkaStreams instance for the KafkaStreams instance to create a NamedTopologyBuilder (Kafka Streams) for the given queryId.
buildPersistentQueryInSharedRuntime builds a query implementation for the given physical plan.
buildPersistentQueryInSharedRuntime requests the NamedTopologyBuilder to build a NamedTopology (Kafka Streams).
buildPersistentQueryInSharedRuntime...FIXME
buildPersistentQueryInSharedRuntime is used when:
- QueryRegistryImplis requested to createOrReplacePersistentQuery
Finding NamedTopology¶
NamedTopology getNamedTopology(
  SharedKafkaStreamsRuntime sharedRuntime,
  QueryId queryId,
  String applicationId,
  Map<String, Object>  queryOverrides,
  ExecutionStep<?> physicalPlan)
getNamedTopology...FIXME
getKafkaStreamsInstance¶
SharedKafkaStreamsRuntime getKafkaStreamsInstance(
  Set<SourceName> sources,
  QueryId queryID,
  MetricCollectors metricCollectors)
getKafkaStreamsInstance...FIXME
Building Streams Properties¶
Map<String, Object> buildStreamsProperties(
  String applicationId,
  Optional<QueryId> queryId,
  MetricCollectors metricCollectors,
  KsqlConfig config,
  ProcessingLogContext processingLogContext)
buildStreamsProperties requests the given KsqlConfig for getKsqlStreamConfigProps for the given applicationId.
buildStreamsProperties sets StreamsConfig.APPLICATION_ID_CONFIG to the given applicationId.
buildStreamsProperties requests the given ProcessingLogContext for ProcessingLoggerFactory to build a ProcessingLogger.
buildStreamsProperties sets ProductionExceptionHandlerUtil.KSQL_PRODUCTION_ERROR_LOGGER to the ProcessingLogger.
buildStreamsProperties...FIXME
buildStreamsProperties is used when:
- QueryBuilderis requested to build a transient query and a persistent query in dedicated or shared runtime (and getKafkaStreamsInstance)
- QueryRegistryImplis requested to updateStreamsPropertiesAndRestartRuntime (and updateStreamsProperties)
getUncaughtExceptionProcessingLogger¶
ProcessingLogger getUncaughtExceptionProcessingLogger(
  QueryId queryId)
getUncaughtExceptionProcessingLogger...FIXME
getUncaughtExceptionProcessingLogger is used when:
- QueryBuilderis requested to build persistent query in dedicated or shared runtime (and build a PersistentQueryMetadata)