QueryBuilder¶
Creating Instance¶
QueryBuilder
takes the following to be created:
- SessionConfig
- ProcessingLogContext
- ServiceContext
- FunctionRegistry
- KafkaStreamsBuilder
-
MaterializationProviderBuilderFactory
- SharedKafkaStreamsRuntimes
- real Flag
QueryBuilder
is created when:
QueryRegistryImpl
is created
real Flag¶
QueryBuilder
is given real
flag when created.
The flag is used to indicate whether executed in a non-sandboxed (real) environment or not (sandbox) when:
- buildPersistentQueryInSharedRuntime (to decide what PersistentQueryMetadata to use)
- getKafkaStreamsInstance (to decide what SharedKafkaStreamsRuntime to use)
KafkaStreamsBuilder¶
QueryBuilder
can be given a KafkaStreamsBuilder when created. Unless given, QueryBuilder
creates a KafkaStreamsBuilderImpl with the KafkaClientSupplier from the given ServiceContext.
The KafkaStreamsBuilder
is used when:
Building Transient Query¶
TransientQueryMetadata buildTransientQuery(
String statementText,
QueryId queryId,
Set<SourceName> sources,
ExecutionStep<?> physicalPlan,
String planSummary,
LogicalSchema schema,
OptionalInt limit,
Optional<WindowInfo> windowInfo,
boolean excludeTombstones,
QueryMetadata.Listener listener,
StreamsBuilder streamsBuilder,
Optional<ImmutableMap<TopicPartition, Long>> endOffsets)
buildTransientQuery
requests the SessionConfig for the KsqlConfig (with overrides applied).
Kafka Streams
This is the moment where ksqlDB relies on Kafka Streams (when building a QueryImplementation)
buildTransientQuery
builds the following:
- Application ID (with
persistent
flag disabled) - RuntimeBuildContext
- Configuration properties
- QueryImplementation
- TransientQueryQueue
buildTransientQuery
requests the given StreamsBuilder
(Kafka Streams) to build a Topology
(Kafka Streams).
buildTransientQuery
determines a ResultType
(based on the QueryImplementation
and the optional windowInfo
):
WINDOWED_TABLE
for aKTableHolder
with thewindowInfo
specifiedTABLE
for aKTableHolder
with nowindowInfo
specifiedSTREAM
for all other cases
In the end, buildTransientQuery
creates a TransientQueryMetadata.
buildTransientQuery
is used when:
QueryRegistryImpl
is requested to createTransientQuery and createStreamPullQuery
Building RuntimeBuildContext¶
RuntimeBuildContext buildContext(
String applicationId,
QueryId queryId,
StreamsBuilder streamsBuilder)
buildContext
creates a RuntimeBuildContext.
buildContext
is used when:
QueryBuilder
is requested to buildTransientQuery, buildPersistentQueryInDedicatedRuntime, buildPersistentQueryInSharedRuntime and getNamedTopology
Building Query Implementation¶
Object buildQueryImplementation(
ExecutionStep<?> physicalPlan,
RuntimeBuildContext runtimeBuildContext)
Kafka Streams
This is the moment in a ksqlDB query's life cycle when the physical plan is converted into a Kafka Streams application.
buildQueryImplementation
creates a KSPlanBuilder with the given RuntimeBuildContext.
In the end, buildQueryImplementation
requests the given ExecutionStep physical plan to build a Kafka Streams application (with the KSPlanBuilder
).
buildQueryImplementation
is used when QueryBuilder
is requested to build the following:
Building Persistent Query (Dedicated Runtime)¶
PersistentQueryMetadata buildPersistentQueryInDedicatedRuntime(
KsqlConfig ksqlConfig,
KsqlConstants.PersistentQueryType persistentQueryType,
String statementText,
QueryId queryId,
Optional<DataSource> sinkDataSource,
Set<DataSource> sources,
ExecutionStep<?> physicalPlan,
String planSummary,
QueryMetadata.Listener listener,
Supplier<List<PersistentQueryMetadata>> allPersistentQueries,
StreamsBuilder streamsBuilder,
MetricCollectors metricCollectors)
buildPersistentQueryInDedicatedRuntime
builds an application ID (with the persistent
flag enabled).
buildPersistentQueryInDedicatedRuntime
builds streams properties.
buildPersistentQueryInDedicatedRuntime
builds a physical schema.
buildPersistentQueryInDedicatedRuntime
buildContext (with the application ID, the given QueryId
and StreamsBuilder
).
Kafka Streams
This is the moment where ksqlDB relies on Kafka Streams (when building a QueryImplementation)
buildPersistentQueryInDedicatedRuntime
buildQueryImplementation (with the given ExecutionStep physical plan and the RuntimeBuildContext).
buildPersistentQueryInDedicatedRuntime
requests the given StreamsBuilder
(Kafka Streams) to build a topology (with the streams properties).
In the end, buildPersistentQueryInDedicatedRuntime
creates a PersistentQueryMetadataImpl.
buildPersistentQueryInDedicatedRuntime
is used when:
QueryRegistryImpl
is requested to createOrReplacePersistentQuery (with no shared runtime ID or ksql.runtime.feature.shared.enabled disabled)
PersistentQueryMetadata buildPersistentQueryInSharedRuntime(
KsqlConfig ksqlConfig,
KsqlConstants.PersistentQueryType persistentQueryType,
String statementText,
QueryId queryId,
Optional<DataSource> sinkDataSource,
Set<DataSource> sources,
ExecutionStep<?> physicalPlan,
String planSummary,
QueryMetadata.Listener listener,
Supplier<List<PersistentQueryMetadata>> allPersistentQueries,
String applicationId,
MetricCollectors metricCollectors)
buildPersistentQueryInSharedRuntime
looks up the (shared) KafkaStreams instance for the given applicationId
.
buildPersistentQueryInSharedRuntime
requests the (shared) KafkaStreams instance for the KafkaStreams
instance to create a NamedTopologyBuilder
(Kafka Streams) for the given queryId
.
buildPersistentQueryInSharedRuntime
builds a query implementation for the given physical plan.
buildPersistentQueryInSharedRuntime
requests the NamedTopologyBuilder
to build a NamedTopology
(Kafka Streams).
buildPersistentQueryInSharedRuntime
...FIXME
buildPersistentQueryInSharedRuntime
is used when:
QueryRegistryImpl
is requested to createOrReplacePersistentQuery
Finding NamedTopology¶
NamedTopology getNamedTopology(
SharedKafkaStreamsRuntime sharedRuntime,
QueryId queryId,
String applicationId,
Map<String, Object> queryOverrides,
ExecutionStep<?> physicalPlan)
getNamedTopology
...FIXME
getKafkaStreamsInstance¶
SharedKafkaStreamsRuntime getKafkaStreamsInstance(
Set<SourceName> sources,
QueryId queryID,
MetricCollectors metricCollectors)
getKafkaStreamsInstance
...FIXME
Building Streams Properties¶
Map<String, Object> buildStreamsProperties(
String applicationId,
Optional<QueryId> queryId,
MetricCollectors metricCollectors,
KsqlConfig config,
ProcessingLogContext processingLogContext)
buildStreamsProperties
requests the given KsqlConfig for getKsqlStreamConfigProps for the given applicationId
.
buildStreamsProperties
sets StreamsConfig.APPLICATION_ID_CONFIG
to the given applicationId
.
buildStreamsProperties
requests the given ProcessingLogContext for ProcessingLoggerFactory to build a ProcessingLogger.
buildStreamsProperties
sets ProductionExceptionHandlerUtil.KSQL_PRODUCTION_ERROR_LOGGER
to the ProcessingLogger.
buildStreamsProperties
...FIXME
buildStreamsProperties
is used when:
QueryBuilder
is requested to build a transient query and a persistent query in dedicated or shared runtime (and getKafkaStreamsInstance)QueryRegistryImpl
is requested to updateStreamsPropertiesAndRestartRuntime (and updateStreamsProperties)
getUncaughtExceptionProcessingLogger¶
ProcessingLogger getUncaughtExceptionProcessingLogger(
QueryId queryId)
getUncaughtExceptionProcessingLogger
...FIXME
getUncaughtExceptionProcessingLogger
is used when:
QueryBuilder
is requested to build persistent query in dedicated or shared runtime (and build a PersistentQueryMetadata)