Skip to content

DistributingExecutor

DistributingExecutor is created for RequestHandler (to execute KSQL statements).

DistributingExecutor

When requested to execute a statement, DistributingExecutor uses a transactional Kafka producer to enqueue the command (to the CommandQueue).

Creating Instance

DistributingExecutor takes the following to be created:

DistributingExecutor is created when:

CommandIdAssigner

DistributingExecutor creates a CommandIdAssigner when created.

The CommandIdAssigner is used to generate a CommandId to execute a KSQL statement.

Executing Statement

StatementExecutorResponse execute(
  ConfiguredStatement<? extends Statement> statement,
  KsqlExecutionContext executionContext,
  KsqlSecurityContext securityContext)

execute requests the injectorFactory to inject into the given ConfiguredStatement.

For a InsertInto statement, execute validateInsertIntoQueries.

execute checkIfNotExistsResponse.

execute checkAuthorization.

execute requests the CommandQueue for a transactional Kafka producer (Producer<CommandId, Command> to produce to the command topic).

execute initialize transactions (using Kafka's Producer.initTransactions).

execute starts a transaction (using Kafka's Producer.beginTransaction).

execute requests the CommandQueue to waitForCommandConsumer (to let it process all available commands).

execute...FIXME

execute requests the CommandQueue to enqueue the command and commits the transaction (using Kafka's Producer.commitTransaction).

execute waits for the final status (of executing the command) distributedCmdResponseTimeout time.

In the end, execute creates a StatementExecutorResponse (as handled) and closes the transactional Kafka producer.


execute is used when:

validateInsertIntoQueries

void validateInsertIntoQueries(
  MetaStore metaStore,
  InsertInto insertInto)

validateInsertIntoQueries throws a KsqlException when the given MetaStore could not find a DataSource for the target of the given InsertInto.

Cannot insert into an unknown stream: [target]

validateInsertIntoQueries throws a KsqlException when the underlying topic (of the DataSource) is read-only:

Cannot insert into read-only topic: [topic]

validateInsertIntoQueries throws a KsqlException when the DataSource has header columns:

Cannot insert into [target] because it has header columns