DistributingExecutor¶
DistributingExecutor
is created for RequestHandler (to execute KSQL statements).
When requested to execute a statement, DistributingExecutor
uses a transactional Kafka producer to enqueue the command (to the CommandQueue).
Creating Instance¶
DistributingExecutor
takes the following to be created:
- KsqlConfig
- CommandQueue
- ksql.server.command.response.timeout.ms
- Injector factory (for a KsqlExecutionContext and ServiceContext)
- ValidatedCommandFactory
- Error Handler
- CommandRunner Warning (
Supplier<String>
)
DistributingExecutor
is created when:
KsqlResource
is requested to configure (and creates a RequestHandler)
CommandIdAssigner¶
DistributingExecutor
creates a CommandIdAssigner when created.
The CommandIdAssigner
is used to generate a CommandId to execute a KSQL statement.
Executing Statement¶
StatementExecutorResponse execute(
ConfiguredStatement<? extends Statement> statement,
KsqlExecutionContext executionContext,
KsqlSecurityContext securityContext)
execute
requests the injectorFactory to inject into the given ConfiguredStatement
.
For a InsertInto statement, execute
validateInsertIntoQueries.
execute
checkIfNotExistsResponse.
execute
checkAuthorization.
execute
requests the CommandQueue for a transactional Kafka producer (Producer<CommandId, Command>
to produce to the command topic).
execute
initialize transactions (using Kafka's Producer.initTransactions).
execute
starts a transaction (using Kafka's Producer.beginTransaction).
execute
requests the CommandQueue to waitForCommandConsumer (to let it process all available commands).
execute
...FIXME
execute
requests the CommandQueue to enqueue the command and commits the transaction (using Kafka's Producer.commitTransaction).
execute
waits for the final status (of executing the command) distributedCmdResponseTimeout time.
In the end, execute
creates a StatementExecutorResponse
(as handled) and closes the transactional Kafka producer.
execute
is used when:
RequestHandler
is requested to execute a SQL statement
validateInsertIntoQueries¶
void validateInsertIntoQueries(
MetaStore metaStore,
InsertInto insertInto)
validateInsertIntoQueries
throws a KsqlException
when the given MetaStore could not find a DataSource for the target of the given InsertInto.
Cannot insert into an unknown stream: [target]
validateInsertIntoQueries
throws a KsqlException
when the underlying topic (of the DataSource
) is read-only:
Cannot insert into read-only topic: [topic]
validateInsertIntoQueries
throws a KsqlException
when the DataSource
has header columns:
Cannot insert into [target] because it has header columns