Skip to content


QueryMetadataImpl is a QueryMetadata.

QueryMetadataImpl is a thin wrapper around KafkaStreams.

Creating Instance

QueryMetadataImpl takes the following to be created:

  • Statement Text
  • LogicalSchema
  • Source Names
  • Execution Plan
  • Query Application ID
  • Topology
  • KafkaStreamsBuilder
  • Streams Properties
  • overriddenProperties
  • closeTimeout
  • QueryId
  • QueryErrorClassifier
  • maxQueryErrorsQueueSize
  • baseWaitingTimeMs
  • retryBackoffMaxMs
  • Listener


QueryMetadataImpl is given a Topology (Kafka Streams) when created.

The Topology is used when:


Topology getTopology()

getTopology is part of the QueryMetadata abstraction.

getTopology returns the Topology instance.


QueryMetadataImpl is given a KafkaStreams (Kafka Streams) when created and requested to resetKafkaStreams.

The KafkaStreams is started at start and closed at close.

The KafkaStreams is used when:


KafkaStreams getKafkaStreams()

getKafkaStreams is part of the QueryMetadata abstraction.

getKafkaStreams returns the KafkaStreams instance.


QueryMetadataImpl is given a KafkaStreamsBuilder when created.


void initialize()

initialize is part of the QueryMetadata abstraction.

initialize requests the KafkaStreamsBuilder to build a KafkaStreams instance (with the Topology and the streamsProperties).

initialize resets the KafkaStreams instance and turns the initialized flag on.

Query Type

KsqlQueryType getQueryType()

getQueryType is part of the QueryMetadata abstraction.

getQueryType is KsqlQueryType.PERSISTENT.

Closing Query

void close()

close is part of the QueryMetadata abstraction.

close requests the loggerFactory for...FIXME

close doClose (with cleanUp enabled).

In the end, close requests the Listener to onClose.


void doClose(
  boolean cleanUp)

doClose closeKafkaStreams and then requests the KafkaStreams to cleanUp.

doClose prints out the following WARN message to the logs when closeKafkaStreams did not succeed:

Query has not successfully closed, skipping cleanup

doClose is used when:

  • PersistentQueryMetadataImpl is requested to stop (with cleanUp disabled)
  • QueryMetadataImpl is requested to close (with cleanUp enabled)


boolean closeKafkaStreams()