DistributingExecutor¶
DistributingExecutor is created for RequestHandler (to execute KSQL statements).

When requested to execute a statement, DistributingExecutor uses a transactional Kafka producer to enqueue the command (to the CommandQueue).
Creating Instance¶
DistributingExecutor takes the following to be created:
- KsqlConfig
- CommandQueue
- ksql.server.command.response.timeout.ms
- Injector factory (for a KsqlExecutionContext and ServiceContext)
- ValidatedCommandFactory
- Error Handler
- CommandRunner Warning (
Supplier<String>)
DistributingExecutor is created when:
KsqlResourceis requested to configure (and creates a RequestHandler)
CommandIdAssigner¶
DistributingExecutor creates a CommandIdAssigner when created.
The CommandIdAssigner is used to generate a CommandId to execute a KSQL statement.
Executing Statement¶
StatementExecutorResponse execute(
ConfiguredStatement<? extends Statement> statement,
KsqlExecutionContext executionContext,
KsqlSecurityContext securityContext)
execute requests the injectorFactory to inject into the given ConfiguredStatement.
For a InsertInto statement, execute validateInsertIntoQueries.
execute checkIfNotExistsResponse.
execute checkAuthorization.
execute requests the CommandQueue for a transactional Kafka producer (Producer<CommandId, Command> to produce to the command topic).
execute initialize transactions (using Kafka's Producer.initTransactions).
execute starts a transaction (using Kafka's Producer.beginTransaction).
execute requests the CommandQueue to waitForCommandConsumer (to let it process all available commands).
execute...FIXME
execute requests the CommandQueue to enqueue the command and commits the transaction (using Kafka's Producer.commitTransaction).
execute waits for the final status (of executing the command) distributedCmdResponseTimeout time.
In the end, execute creates a StatementExecutorResponse (as handled) and closes the transactional Kafka producer.
execute is used when:
RequestHandleris requested to execute a SQL statement
validateInsertIntoQueries¶
void validateInsertIntoQueries(
MetaStore metaStore,
InsertInto insertInto)
validateInsertIntoQueries throws a KsqlException when the given MetaStore could not find a DataSource for the target of the given InsertInto.
Cannot insert into an unknown stream: [target]
validateInsertIntoQueries throws a KsqlException when the underlying topic (of the DataSource) is read-only:
Cannot insert into read-only topic: [topic]
validateInsertIntoQueries throws a KsqlException when the DataSource has header columns:
Cannot insert into [target] because it has header columns