Skip to content

KsqlContext

KsqlContext is part of ksqlDB embedded mode for developers to use the ksqlDB engine embedded in their applications to execute ksql statements.

Creating Instance

KsqlContext takes the following to be created:

KsqlContext is created using create utility.

KsqlEngine

KsqlContext is given a KsqlEngine when created.

The KsqlEngine is active until close.

The KsqlEngine is used when:

Creating KsqlContext

KsqlContext create(
  KsqlConfig ksqlConfig,
  ProcessingLogContext processingLogContext)

create creates a ServiceContext (with the given KsqlConfig).

create creates a UserFunctionLoader to load UDFs.

create creates a ServiceInfo.

create creates a KsqlEngine (with a new SequentialQueryIdGenerator).

In the end, create creates a KsqlContext (with the DEFAULT injectors).

Executing SQL Text

List<QueryMetadata> sql(
  String sql) // (1)!
List<QueryMetadata> sql(
  String sql,
  Map<String, ?> overriddenProperties)
  1. Uses an empty overriddenProperties collection

sql requests the KsqlEngine to parse the sql (into a collection of prepared statements).

sql requests the KsqlEngine to create a sandbox to execute the statements within (one by one).

sql executes the (prepared) statements (one by one).

sql requests persistent queries to start (if there are any).

sql prints out the following WARN message to the logs for all other non-persistent queries:

Ignoring statement: [sql]
Only CREATE statements can run in KSQL embedded mode.

In the end, sql returns all queriess (persistent and non-persistent).

Executing Statement

ExecuteResult execute(
  KsqlExecutionContext executionContext,
  ParsedStatement stmt,
  KsqlConfig ksqlConfig,
  Map<String, Object> mutableSessionPropertyOverrides,
  Injector injector)

execute...FIXME

Demo

import io.confluent.ksql.util.KsqlConfig
import scala.jdk.CollectionConverters._
val props = Map("bootstrap.servers" -> ":9092").asJava
val ksqlConfig = new KsqlConfig(props)

import io.confluent.ksql.logging.processing.ProcessingLogContext
val processingLogContext = ProcessingLogContext.create()

import io.confluent.ksql.metrics.MetricCollectors
val metricCollectors = new MetricCollectors()

import io.confluent.ksql.embedded.KsqlContext
val ksqlContext = KsqlContext.create(
  ksqlConfig,
  processingLogContext,
  metricCollectors)

Logging

Enable ALL logging level for io.confluent.ksql.embedded.KsqlContext logger to see what happens inside.

Add the following line to log4j.properties:

log4j.logger.io.confluent.ksql.embedded.KsqlContext=ALL

Refer to Logging.