Skip to content

QueryMetadataImpl

QueryMetadataImpl is a QueryMetadata.

QueryMetadataImpl is a thin wrapper around KafkaStreams.

Creating Instance

QueryMetadataImpl takes the following to be created:

  • Statement Text
  • LogicalSchema
  • Source Names
  • Execution Plan
  • Query Application ID
  • Topology
  • KafkaStreamsBuilder
  • Streams Properties
  • overriddenProperties
  • closeTimeout
  • QueryId
  • QueryErrorClassifier
  • maxQueryErrorsQueueSize
  • baseWaitingTimeMs
  • retryBackoffMaxMs
  • Listener

Topology

QueryMetadataImpl is given a Topology (Kafka Streams) when created.

The Topology is used when:

getTopology

Topology getTopology()

getTopology is part of the QueryMetadata abstraction.


getTopology returns the Topology instance.

KafkaStreams

QueryMetadataImpl is given a KafkaStreams (Kafka Streams) when created and requested to resetKafkaStreams.

The KafkaStreams is started at start and closed at close.

The KafkaStreams is used when:

getKafkaStreams

KafkaStreams getKafkaStreams()

getKafkaStreams is part of the QueryMetadata abstraction.


getKafkaStreams returns the KafkaStreams instance.

KafkaStreamsBuilder

QueryMetadataImpl is given a KafkaStreamsBuilder when created.

initialize

void initialize()

initialize is part of the QueryMetadata abstraction.


initialize requests the KafkaStreamsBuilder to build a KafkaStreams instance (with the Topology and the streamsProperties).

initialize resets the KafkaStreams instance and turns the initialized flag on.

Query Type

KsqlQueryType getQueryType()

getQueryType is part of the QueryMetadata abstraction.


getQueryType is KsqlQueryType.PERSISTENT.

Closing Query

void close()

close is part of the QueryMetadata abstraction.


close requests the loggerFactory for...FIXME

close doClose (with cleanUp enabled).

In the end, close requests the Listener to onClose.

doClose

void doClose(
  boolean cleanUp)

doClose closeKafkaStreams and then requests the KafkaStreams to cleanUp.

doClose prints out the following WARN message to the logs when closeKafkaStreams did not succeed:

Query has not successfully closed, skipping cleanup

doClose is used when:

  • PersistentQueryMetadataImpl is requested to stop (with cleanUp disabled)
  • QueryMetadataImpl is requested to close (with cleanUp enabled)

closeKafkaStreams

boolean closeKafkaStreams()

closeKafkaStreams...FIXME