QueryBuilder¶
Creating Instance¶
QueryBuilder
takes the following to be created:
-
SessionConfig
-
ProcessingLogContext
- ServiceContext
-
FunctionRegistry
- KafkaStreamsBuilder
-
MaterializationProviderBuilderFactory
- SharedKafkaStreamsRuntimes
-
real
flag
QueryBuilder
is created when:
QueryRegistryImpl
is created
KafkaStreamsBuilder¶
QueryBuilder
can be given a KafkaStreamsBuilder when created. Unless given, QueryBuilder
creates a KafkaStreamsBuilderImpl with the KafkaClientSupplier from the given ServiceContext.
The KafkaStreamsBuilder
is used when:
Building Transient Query¶
TransientQueryMetadata buildTransientQuery(
String statementText,
QueryId queryId,
Set<SourceName> sources,
ExecutionStep<?> physicalPlan,
String planSummary,
LogicalSchema schema,
OptionalInt limit,
Optional<WindowInfo> windowInfo,
boolean excludeTombstones,
QueryMetadata.Listener listener,
StreamsBuilder streamsBuilder,
Optional<ImmutableMap<TopicPartition, Long>> endOffsets)
Kafka Streams
buildTransientQuery
is given a new StreamsBuilder
(Kafka Streams) that is used to build a RuntimeBuildContext and then a Topology
(Kafka Streams).
That means that the Kafka Streams topology can only be created while building the RuntimeBuildContext.
buildTransientQuery
requests the SessionConfig for the KsqlConfig (with overrides applied).
buildTransientQuery
builds the following:
- Application ID (with
persistent
flag disabled) - RuntimeBuildContext
- Configuration properties
- QueryImplementation
- TransientQueryQueue
buildTransientQuery
requests the given StreamsBuilder
(Kafka Streams) to build a Topology
(Kafka Streams).
buildTransientQuery
determines a ResultType
(based on the QueryImplementation
and the optional windowInfo
):
WINDOWED_TABLE
for aKTableHolder
with thewindowInfo
specifiedTABLE
for aKTableHolder
with nowindowInfo
specifiedSTREAM
for all other cases
In the end, buildTransientQuery
creates a TransientQueryMetadata.
buildTransientQuery
is used when:
QueryRegistryImpl
is requested to createTransientQuery and createStreamPullQuery
Building RuntimeBuildContext¶
RuntimeBuildContext buildContext(
String applicationId,
QueryId queryId,
StreamsBuilder streamsBuilder)
buildContext
creates a RuntimeBuildContext.
buildContext
is used when:
QueryBuilder
is requested to buildTransientQuery, buildPersistentQueryInDedicatedRuntime, buildPersistentQueryInSharedRuntime and getNamedTopology
Building Query Implementation¶
Object buildQueryImplementation(
ExecutionStep<?> physicalPlan,
RuntimeBuildContext runtimeBuildContext)
Kafka Streams
This is the moment in a ksqlDB query's life cycle when the physical plan is converted into a Kafka Streams application.
buildQueryImplementation
creates a KSPlanBuilder with the given RuntimeBuildContext.
In the end, buildQueryImplementation
requests the given physical plan to build a Kafka Streams application (with the KSPlanBuilder
).
buildQueryImplementation
is used when QueryBuilder
is requested to build the following:
PersistentQueryMetadata buildPersistentQueryInSharedRuntime(
KsqlConfig ksqlConfig,
KsqlConstants.PersistentQueryType persistentQueryType,
String statementText,
QueryId queryId,
Optional<DataSource> sinkDataSource,
Set<DataSource> sources,
ExecutionStep<?> physicalPlan,
String planSummary,
QueryMetadata.Listener listener,
Supplier<List<PersistentQueryMetadata>> allPersistentQueries,
String applicationId,
MetricCollectors metricCollectors)
buildPersistentQueryInSharedRuntime
looks up the (shared) KafkaStreams instance for the given applicationId
.
buildPersistentQueryInSharedRuntime
requests the (shared) KafkaStreams instance for the KafkaStreams
instance to create a NamedTopologyBuilder
(Kafka Streams) for the given queryId
.
buildPersistentQueryInSharedRuntime
builds a query implementation for the given physical plan.
buildPersistentQueryInSharedRuntime
requests the NamedTopologyBuilder
to build a NamedTopology
(Kafka Streams).
buildPersistentQueryInSharedRuntime
...FIXME
buildPersistentQueryInSharedRuntime
is used when:
QueryRegistryImpl
is requested to createOrReplacePersistentQuery
Finding NamedTopology¶
NamedTopology getNamedTopology(
SharedKafkaStreamsRuntime sharedRuntime,
QueryId queryId,
String applicationId,
Map<String, Object> queryOverrides,
ExecutionStep<?> physicalPlan)
getNamedTopology
...FIXME
getKafkaStreamsInstance¶
SharedKafkaStreamsRuntime getKafkaStreamsInstance(
Set<SourceName> sources,
QueryId queryID,
MetricCollectors metricCollectors)
getKafkaStreamsInstance
...FIXME
Building Persistent Query (Dedicated Runtime)¶
PersistentQueryMetadata buildPersistentQueryInDedicatedRuntime(
KsqlConfig ksqlConfig,
KsqlConstants.PersistentQueryType persistentQueryType,
String statementText,
QueryId queryId,
Optional<DataSource> sinkDataSource,
Set<DataSource> sources,
ExecutionStep<?> physicalPlan,
String planSummary,
QueryMetadata.Listener listener,
Supplier<List<PersistentQueryMetadata>> allPersistentQueries,
StreamsBuilder streamsBuilder,
MetricCollectors metricCollectors)
buildPersistentQueryInDedicatedRuntime
builds an application ID (with the persistent
flag enabled).
buildPersistentQueryInDedicatedRuntime
buildStreamsProperties.
buildPersistentQueryInDedicatedRuntime
...FIXME
buildPersistentQueryInDedicatedRuntime
is used when:
QueryRegistryImpl
is requested to createOrReplacePersistentQuery (with a shared runtime ID)