Skip to content

QueryRegistryImpl

Creating Instance

QueryRegistryImpl takes the following to be created:

QueryRegistryImpl is created when:

QueryBuilderFactory

QueryRegistryImpl is given a QueryBuilderFactory or defines a function to create a QueryBuilder when created.

Creating Stream Pull Query

TransientQueryMetadata createStreamPullQuery(
  SessionConfig config,
  ServiceContext serviceContext,
  ProcessingLogContext processingLogContext,
  MetaStore metaStore,
  String statementText,
  QueryId queryId,
  Set<SourceName> sources,
  ExecutionStep<?> physicalPlan,
  String planSummary,
  LogicalSchema schema,
  OptionalInt limit,
  Optional<WindowInfo> windowInfo,
  boolean excludeTombstones,
  ImmutableMap<TopicPartition, Long> endOffsets)

createStreamPullQuery is part of the QueryRegistry abstraction.


Note

createStreamPullQuery is createTransientQuery with endOffsets.

createStreamPullQuery requests the QueryBuilderFactory for a QueryBuilder to buildTransientQuery (with a new Kafka Streams' StreamsBuilder).

createStreamPullQuery requests the TransientQueryMetadata to initialize.

createStreamPullQuery notifies the QueryEventListeners.

Creating Transient Query

TransientQueryMetadata createTransientQuery(
  SessionConfig config,
  ServiceContext serviceContext,
  ProcessingLogContext processingLogContext,
  MetaStore metaStore,
  String statementText,
  QueryId queryId,
  Set<SourceName> sources,
  ExecutionStep<?> physicalPlan,
  String planSummary,
  LogicalSchema schema,
  OptionalInt limit,
  Optional<WindowInfo> windowInfo,
  boolean excludeTombstones)

createTransientQuery is part of the QueryRegistry abstraction.


Note

createTransientQuery is createStreamPullQuery with no endOffsets.

createTransientQuery requests the QueryBuilderFactory for a QueryBuilder.

createTransientQuery requests the QueryBuilder to build a transient query (with a new StreamsBuilder (Kafka Streams) that gives a TransientQueryMetadata).

createTransientQuery requests the TransientQueryMetadata to initialize.

createTransientQuery registerTransientQuery and returns the TransientQueryMetadata.

Creating or Replacing Persistent Query

PersistentQueryMetadata createOrReplacePersistentQuery(
  SessionConfig config,
  ServiceContext serviceContext,
  ProcessingLogContext processingLogContext,
  MetaStore metaStore,
  String statementText,
  QueryId queryId,
  Optional<DataSource> sinkDataSource,
  Set<DataSource> sources,
  ExecutionStep<?> physicalPlan,
  String planSummary,
  KsqlConstants.PersistentQueryType persistentQueryType,
  Optional<String> sharedRuntimeId)

createOrReplacePersistentQuery is part of the QueryRegistry abstraction.


createOrReplacePersistentQuery requests the QueryBuilderFactory for a QueryBuilder to build a persistent query in shared or dedicated runtime based on the given sharedRuntimeId (available or not, respectively).

In the end, createOrReplacePersistentQuery registers the persistent query.

registerPersistentQuery

void registerPersistentQuery(
  ServiceContext serviceContext,
  MetaStore metaStore,
  PersistentQueryMetadata persistentQuery)

registerPersistentQuery takes the QueryId from the given PersistentQueryMetadata.

registerPersistentQuery requests the given PersistentQueryMetadata to initialize when this is a new query (a new QueryId) or the old query is not sandboxed.

registerPersistentQuery adds the QueryId with the PersistentQueryMetadata to the persistentQueries registry.

registerPersistentQuery registers the persistent query based on the type:

  • For CREATE_SOURCE, the single source name with the query ID in the createAsQueries registry

  • For CREATE_AS, the sink name with the query ID in the createAsQueries registry

  • For INSERT, all the sink and source names with the query ID in the insertQueries registry

registerPersistentQuery adds the QueryId with the PersistentQueryMetadata to the allLiveQueries registry.

In the end, registerPersistentQuery notifies event listeners.

Notifying QueryEventListeners about Create Queries

void notifyCreate(
  ServiceContext serviceContext,
  MetaStore metaStore,
  QueryMetadata queryMetadata)

notifyCreate requests the QueryEventListeners to onCreate

notifyCreate is used when:

updateStreamsPropertiesAndRestartRuntime

void updateStreamsPropertiesAndRestartRuntime(
  KsqlConfig config,
  ProcessingLogContext logContext)

updateStreamsPropertiesAndRestartRuntime is part of the QueryRegistry abstraction.


updateStreamsPropertiesAndRestartRuntime...FIXME

updateStreamsProperties

void updateStreamsProperties(
  SharedKafkaStreamsRuntime stream,
  KsqlConfig config,
  ProcessingLogContext logContext)

updateStreamsProperties...FIXME