KsqlContext¶
KsqlContext is part of ksqlDB embedded mode for developers to use the ksqlDB engine embedded in their applications to execute ksql statements.
Creating Instance¶
KsqlContext takes the following to be created:
KsqlContext is created using create utility.
KsqlEngine¶
KsqlContext is given a KsqlEngine when created.
The KsqlEngine is active until close.
The KsqlEngine is used when:
Creating KsqlContext¶
KsqlContext create(
  KsqlConfig ksqlConfig,
  ProcessingLogContext processingLogContext)
create creates a ServiceContext (with the given KsqlConfig).
create creates a UserFunctionLoader to load UDFs.
create creates a ServiceInfo.
create creates a KsqlEngine (with a new SequentialQueryIdGenerator).
In the end, create creates a KsqlContext (with the DEFAULT injectors).
Executing SQL Text¶
List<QueryMetadata> sql(
  String sql) // (1)!
List<QueryMetadata> sql(
  String sql,
  Map<String, ?> overriddenProperties)
- Uses an empty 
overriddenPropertiescollection 
sql requests the KsqlEngine to parse the sql (into a collection of prepared statements).
sql requests the KsqlEngine to create a sandbox to execute the statements within (one by one).
sql executes the (prepared) statements (one by one).
sql requests persistent queries to start (if there are any).
sql prints out the following WARN message to the logs for all other non-persistent queries:
Ignoring statement: [sql]
Only CREATE statements can run in KSQL embedded mode.
In the end, sql returns all queriess (persistent and non-persistent).
Executing Statement¶
ExecuteResult execute(
  KsqlExecutionContext executionContext,
  ParsedStatement stmt,
  KsqlConfig ksqlConfig,
  Map<String, Object> mutableSessionPropertyOverrides,
  Injector injector)
execute...FIXME
Demo¶
import io.confluent.ksql.util.KsqlConfig
import scala.jdk.CollectionConverters._
val props = Map("bootstrap.servers" -> ":9092").asJava
val ksqlConfig = new KsqlConfig(props)
import io.confluent.ksql.logging.processing.ProcessingLogContext
val processingLogContext = ProcessingLogContext.create()
import io.confluent.ksql.metrics.MetricCollectors
val metricCollectors = new MetricCollectors()
import io.confluent.ksql.embedded.KsqlContext
val ksqlContext = KsqlContext.create(
  ksqlConfig,
  processingLogContext,
  metricCollectors)
Logging¶
Enable ALL logging level for io.confluent.ksql.embedded.KsqlContext logger to see what happens inside.
Add the following line to log4j.properties:
log4j.logger.io.confluent.ksql.embedded.KsqlContext=ALL
Refer to Logging.