QueryBuilder¶
Creating Instance¶
QueryBuilder takes the following to be created:
- SessionConfig
- ProcessingLogContext
- ServiceContext
- FunctionRegistry
- KafkaStreamsBuilder
-
MaterializationProviderBuilderFactory - SharedKafkaStreamsRuntimes
- real Flag
QueryBuilder is created when:
QueryRegistryImplis created
real Flag¶
QueryBuilder is given real flag when created.
The flag is used to indicate whether executed in a non-sandboxed (real) environment or not (sandbox) when:
- buildPersistentQueryInSharedRuntime (to decide what PersistentQueryMetadata to use)
- getKafkaStreamsInstance (to decide what SharedKafkaStreamsRuntime to use)
KafkaStreamsBuilder¶
QueryBuilder can be given a KafkaStreamsBuilder when created. Unless given, QueryBuilder creates a KafkaStreamsBuilderImpl with the KafkaClientSupplier from the given ServiceContext.
The KafkaStreamsBuilder is used when:
Building Transient Query¶
TransientQueryMetadata buildTransientQuery(
String statementText,
QueryId queryId,
Set<SourceName> sources,
ExecutionStep<?> physicalPlan,
String planSummary,
LogicalSchema schema,
OptionalInt limit,
Optional<WindowInfo> windowInfo,
boolean excludeTombstones,
QueryMetadata.Listener listener,
StreamsBuilder streamsBuilder,
Optional<ImmutableMap<TopicPartition, Long>> endOffsets)
buildTransientQuery requests the SessionConfig for the KsqlConfig (with overrides applied).
Kafka Streams
This is the moment where ksqlDB relies on Kafka Streams (when building a QueryImplementation)
buildTransientQuery builds the following:
- Application ID (with
persistentflag disabled) - RuntimeBuildContext
- Configuration properties
- QueryImplementation
- TransientQueryQueue
buildTransientQuery requests the given StreamsBuilder (Kafka Streams) to build a Topology (Kafka Streams).
buildTransientQuery determines a ResultType (based on the QueryImplementation and the optional windowInfo):
WINDOWED_TABLEfor aKTableHolderwith thewindowInfospecifiedTABLEfor aKTableHolderwith nowindowInfospecifiedSTREAMfor all other cases
In the end, buildTransientQuery creates a TransientQueryMetadata.
buildTransientQuery is used when:
QueryRegistryImplis requested to createTransientQuery and createStreamPullQuery
Building RuntimeBuildContext¶
RuntimeBuildContext buildContext(
String applicationId,
QueryId queryId,
StreamsBuilder streamsBuilder)
buildContext creates a RuntimeBuildContext.
buildContext is used when:
QueryBuilderis requested to buildTransientQuery, buildPersistentQueryInDedicatedRuntime, buildPersistentQueryInSharedRuntime and getNamedTopology
Building Query Implementation¶
Object buildQueryImplementation(
ExecutionStep<?> physicalPlan,
RuntimeBuildContext runtimeBuildContext)
Kafka Streams
This is the moment in a ksqlDB query's life cycle when the physical plan is converted into a Kafka Streams application.
buildQueryImplementation creates a KSPlanBuilder with the given RuntimeBuildContext.
In the end, buildQueryImplementation requests the given ExecutionStep physical plan to build a Kafka Streams application (with the KSPlanBuilder).
buildQueryImplementation is used when QueryBuilder is requested to build the following:
Building Persistent Query (Dedicated Runtime)¶
PersistentQueryMetadata buildPersistentQueryInDedicatedRuntime(
KsqlConfig ksqlConfig,
KsqlConstants.PersistentQueryType persistentQueryType,
String statementText,
QueryId queryId,
Optional<DataSource> sinkDataSource,
Set<DataSource> sources,
ExecutionStep<?> physicalPlan,
String planSummary,
QueryMetadata.Listener listener,
Supplier<List<PersistentQueryMetadata>> allPersistentQueries,
StreamsBuilder streamsBuilder,
MetricCollectors metricCollectors)
buildPersistentQueryInDedicatedRuntime builds an application ID (with the persistent flag enabled).
buildPersistentQueryInDedicatedRuntime builds streams properties.
buildPersistentQueryInDedicatedRuntime builds a physical schema.
buildPersistentQueryInDedicatedRuntime buildContext (with the application ID, the given QueryId and StreamsBuilder).
Kafka Streams
This is the moment where ksqlDB relies on Kafka Streams (when building a QueryImplementation)
buildPersistentQueryInDedicatedRuntime buildQueryImplementation (with the given ExecutionStep physical plan and the RuntimeBuildContext).
buildPersistentQueryInDedicatedRuntime requests the given StreamsBuilder (Kafka Streams) to build a topology (with the streams properties).
In the end, buildPersistentQueryInDedicatedRuntime creates a PersistentQueryMetadataImpl.
buildPersistentQueryInDedicatedRuntime is used when:
QueryRegistryImplis requested to createOrReplacePersistentQuery (with no shared runtime ID or ksql.runtime.feature.shared.enabled disabled)
PersistentQueryMetadata buildPersistentQueryInSharedRuntime(
KsqlConfig ksqlConfig,
KsqlConstants.PersistentQueryType persistentQueryType,
String statementText,
QueryId queryId,
Optional<DataSource> sinkDataSource,
Set<DataSource> sources,
ExecutionStep<?> physicalPlan,
String planSummary,
QueryMetadata.Listener listener,
Supplier<List<PersistentQueryMetadata>> allPersistentQueries,
String applicationId,
MetricCollectors metricCollectors)
buildPersistentQueryInSharedRuntime looks up the (shared) KafkaStreams instance for the given applicationId.
buildPersistentQueryInSharedRuntime requests the (shared) KafkaStreams instance for the KafkaStreams instance to create a NamedTopologyBuilder (Kafka Streams) for the given queryId.
buildPersistentQueryInSharedRuntime builds a query implementation for the given physical plan.
buildPersistentQueryInSharedRuntime requests the NamedTopologyBuilder to build a NamedTopology (Kafka Streams).
buildPersistentQueryInSharedRuntime...FIXME
buildPersistentQueryInSharedRuntime is used when:
QueryRegistryImplis requested to createOrReplacePersistentQuery
Finding NamedTopology¶
NamedTopology getNamedTopology(
SharedKafkaStreamsRuntime sharedRuntime,
QueryId queryId,
String applicationId,
Map<String, Object> queryOverrides,
ExecutionStep<?> physicalPlan)
getNamedTopology...FIXME
getKafkaStreamsInstance¶
SharedKafkaStreamsRuntime getKafkaStreamsInstance(
Set<SourceName> sources,
QueryId queryID,
MetricCollectors metricCollectors)
getKafkaStreamsInstance...FIXME
Building Streams Properties¶
Map<String, Object> buildStreamsProperties(
String applicationId,
Optional<QueryId> queryId,
MetricCollectors metricCollectors,
KsqlConfig config,
ProcessingLogContext processingLogContext)
buildStreamsProperties requests the given KsqlConfig for getKsqlStreamConfigProps for the given applicationId.
buildStreamsProperties sets StreamsConfig.APPLICATION_ID_CONFIG to the given applicationId.
buildStreamsProperties requests the given ProcessingLogContext for ProcessingLoggerFactory to build a ProcessingLogger.
buildStreamsProperties sets ProductionExceptionHandlerUtil.KSQL_PRODUCTION_ERROR_LOGGER to the ProcessingLogger.
buildStreamsProperties...FIXME
buildStreamsProperties is used when:
QueryBuilderis requested to build a transient query and a persistent query in dedicated or shared runtime (and getKafkaStreamsInstance)QueryRegistryImplis requested to updateStreamsPropertiesAndRestartRuntime (and updateStreamsProperties)
getUncaughtExceptionProcessingLogger¶
ProcessingLogger getUncaughtExceptionProcessingLogger(
QueryId queryId)
getUncaughtExceptionProcessingLogger...FIXME
getUncaughtExceptionProcessingLogger is used when:
QueryBuilderis requested to build persistent query in dedicated or shared runtime (and build a PersistentQueryMetadata)