KsqlContext¶
KsqlContext
is part of ksqlDB embedded mode for developers to use the ksqlDB engine embedded in their applications to execute ksql statements.
Creating Instance¶
KsqlContext
takes the following to be created:
KsqlContext
is created using create utility.
KsqlEngine¶
KsqlContext
is given a KsqlEngine when created.
The KsqlEngine
is active until close.
The KsqlEngine
is used when:
Creating KsqlContext¶
KsqlContext create(
KsqlConfig ksqlConfig,
ProcessingLogContext processingLogContext)
create
creates a ServiceContext (with the given KsqlConfig).
create
creates a UserFunctionLoader to load UDFs.
create
creates a ServiceInfo.
create
creates a KsqlEngine (with a new SequentialQueryIdGenerator
).
In the end, create
creates a KsqlContext (with the DEFAULT injectors).
Executing SQL Text¶
List<QueryMetadata> sql(
String sql) // (1)!
List<QueryMetadata> sql(
String sql,
Map<String, ?> overriddenProperties)
- Uses an empty
overriddenProperties
collection
sql
requests the KsqlEngine to parse the sql (into a collection of prepared statements).
sql
requests the KsqlEngine to create a sandbox to execute the statements within (one by one).
sql
executes the (prepared) statements (one by one).
sql
requests persistent queries to start (if there are any).
sql
prints out the following WARN message to the logs for all other non-persistent queries:
Ignoring statement: [sql]
Only CREATE statements can run in KSQL embedded mode.
In the end, sql
returns all queriess (persistent and non-persistent).
Executing Statement¶
ExecuteResult execute(
KsqlExecutionContext executionContext,
ParsedStatement stmt,
KsqlConfig ksqlConfig,
Map<String, Object> mutableSessionPropertyOverrides,
Injector injector)
execute
...FIXME
Demo¶
import io.confluent.ksql.util.KsqlConfig
import scala.jdk.CollectionConverters._
val props = Map("bootstrap.servers" -> ":9092").asJava
val ksqlConfig = new KsqlConfig(props)
import io.confluent.ksql.logging.processing.ProcessingLogContext
val processingLogContext = ProcessingLogContext.create()
import io.confluent.ksql.metrics.MetricCollectors
val metricCollectors = new MetricCollectors()
import io.confluent.ksql.embedded.KsqlContext
val ksqlContext = KsqlContext.create(
ksqlConfig,
processingLogContext,
metricCollectors)
Logging¶
Enable ALL
logging level for io.confluent.ksql.embedded.KsqlContext
logger to see what happens inside.
Add the following line to log4j.properties
:
log4j.logger.io.confluent.ksql.embedded.KsqlContext=ALL
Refer to Logging.