Skip to content

QueryRegistryImpl

Creating Instance

QueryRegistryImpl takes the following to be created:

QueryRegistryImpl is created when:

QueryBuilderFactory

QueryRegistryImpl is given a QueryBuilderFactory or defines a function to create a QueryBuilder when created.

Creating Transient Query

TransientQueryMetadata createTransientQuery(
  SessionConfig config,
  ServiceContext serviceContext,
  ProcessingLogContext processingLogContext,
  MetaStore metaStore,
  String statementText,
  QueryId queryId,
  Set<SourceName> sources,
  ExecutionStep<?> physicalPlan,
  String planSummary,
  LogicalSchema schema,
  OptionalInt limit,
  Optional<WindowInfo> windowInfo,
  boolean excludeTombstones)

createTransientQuery requests the QueryBuilderFactory for a QueryBuilder.

createTransientQuery requests the QueryBuilder to build a transient query (with a new StreamsBuilder (Kafka Streams) that gives a TransientQueryMetadata).

createTransientQuery requests the TransientQueryMetadata to initialize.

createTransientQuery registerTransientQuery and returns the TransientQueryMetadata.


createTransientQuery is part of the QueryRegistry abstraction.

Creating or Replacing Persistent Query

PersistentQueryMetadata createOrReplacePersistentQuery(
  SessionConfig config,
  ServiceContext serviceContext,
  ProcessingLogContext processingLogContext,
  MetaStore metaStore,
  String statementText,
  QueryId queryId,
  Optional<DataSource> sinkDataSource,
  Set<DataSource> sources,
  ExecutionStep<?> physicalPlan,
  String planSummary,
  KsqlConstants.PersistentQueryType persistentQueryType,
  Optional<String> sharedRuntimeId)

createOrReplacePersistentQuery requests the QueryBuilderFactory for a QueryBuilder to build a persistent query in shared or dedicated runtime based on the given sharedRuntimeId (available or not, respectively).

In the end, createOrReplacePersistentQuery registers the persistent query.


createOrReplacePersistentQuery is part of the QueryRegistry abstraction.

registerPersistentQuery

void registerPersistentQuery(
  ServiceContext serviceContext,
  MetaStore metaStore,
  PersistentQueryMetadata persistentQuery)

registerPersistentQuery takes the QueryId from the given PersistentQueryMetadata.

registerPersistentQuery requests the given PersistentQueryMetadata to initialize when this is a new query (a new QueryId) or the old query is not sandboxed.

registerPersistentQuery adds the QueryId with the PersistentQueryMetadata to the persistentQueries registry.

registerPersistentQuery registers the persistent query based on the type:

  • For CREATE_SOURCE, the single source name with the query ID in the createAsQueries registry

  • For CREATE_AS, the sink name with the query ID in the createAsQueries registry

  • For INSERT, all the sink and source names with the query ID in the insertQueries registry

registerPersistentQuery adds the QueryId with the PersistentQueryMetadata to the allLiveQueries registry.

In the end, registerPersistentQuery notifies event listeners.

Notifying QueryEventListeners about Create Queries

void notifyCreate(
  ServiceContext serviceContext,
  MetaStore metaStore,
  QueryMetadata queryMetadata)

notifyCreate requests the QueryEventListeners to onCreate

notifyCreate is used when:

Back to top