SchemaRegisterInjector¶
SchemaRegisterInjector is an Injector.
SchemaRegisterInjector handles the following Statements:
SchemaRegisterInjector relies on the following overrides (of the SessionConfig of the given ConfiguredStatement):
Creating Instance¶
SchemaRegisterInjector takes the following to be created:
SchemaRegisterInjector is created when:
Injectorsis requested for NO_TOPIC_DELETE chain
Injecting Metadata¶
<T extends Statement> ConfiguredStatement<T> inject(
ConfiguredStatement<T> statement)
inject is part of the Injector abstraction.
inject branches off based on the type of the given Statement:
In the end, inject stripSchemaIdConfig.
registerForCreateAs¶
void registerForCreateAs(
ConfiguredStatement<? extends CreateAsSelect> cas)
registerForCreateAs...FIXME
registerForCreateAs is used when:
SchemaRegisterInjectoris requested to inject (with a CreateAsSelect statement)
registerForCreateSource¶
void registerForCreateSource(
ConfiguredStatement<? extends CreateSource> cs)
registerForCreateSource takes the CreateSource (from the given ConfiguredStatement) and builds a LogicalSchema (based on TableElements).
registerForCreateSource builds FormatInfos for the keys and values and converts them to Format.
registerForCreateSource builds SerdeFeatures for the values.
registerForCreateSource takes the rawKeySchema and the rawValueSchema from overrides (of the SessionConfig of the given ConfiguredStatement if available).
In the end, registerForCreateSource registerSchemas (unless already registered).
registerForCreateSource is used when:
SchemaRegisterInjectoris requested to inject (with a CreateSource statement)
tryGetFormat¶
Format tryGetFormat(
FormatInfo formatInfo,
boolean isKey,
String statementText)
tryGetFormat...FIXME
stripSchemaIdConfig¶
<T extends Statement> ConfiguredStatement<T> stripSchemaIdConfig(
ConfiguredStatement<T> statement)
stripSchemaIdConfig requests the SessionConfig (of the given ConfiguredStatement) for overrides.
stripSchemaIdConfig removes KEY_SCHEMA_ID and VALUE_SCHEMA_ID from the overrides (and creates a new ConfiguredStatement) if specified. Otherwise, with no KEY_SCHEMA_ID and no VALUE_SCHEMA_ID, stripSchemaIdConfig does nothing and simply returns the given ConfiguredStatement.
stripSchemaIdConfig is used when:
SchemaRegisterInjectoris requested to inject
registerSchemas¶
void registerSchemas(
LogicalSchema schema,
Pair<SchemaAndId, SchemaAndId> kvRawSchema,
String kafkaTopic,
FormatInfo keyFormat,
SerdeFeatures keySerdeFeatures,
FormatInfo valueFormat,
SerdeFeatures valueSerdeFeatures,
KsqlConfig config,
String statementText,
boolean registerIfSchemaExists)
registerSchemas...FIXME
registerSchemas is used when:
SchemaRegisterInjectoris requested to registerForCreateSource and registerForCreateAs
registerSchema¶
void registerSchema(
List<? extends SimpleColumn> schema,
String topic,
FormatInfo formatInfo,
SerdeFeatures serdeFeatures,
KsqlConfig config,
String statementText,
boolean registerIfSchemaExists,
String subject,
boolean isKey)
registerSchema...FIXME
registerRawSchema¶
void registerRawSchema(
SchemaAndId schemaAndId,
String topic,
String statementText,
String subject,
Boolean isKey)
registerRawSchema...FIXME
sanityCheck¶
void sanityCheck(
SchemaAndId schemaAndId,
FormatInfo formatInfo,
String topic,
KsqlConfig config,
String statementText,
boolean isKey)
sanityCheck...FIXME
canRegister¶
boolean canRegister(
Format format,
KsqlConfig config,
String topic)
canRegister is true unless the given Format does not support SCHEMA_INFERENCE feature.
canRegister throws a KsqlSchemaRegistryNotConfiguredException unless ksql.schema.registry.url is defined (non-empty):
Cannot create topic '[topic]' with format [name] without configuring 'ksql.schema.registry.url'
canRegister is used when:
SchemaRegisterInjectoris requested to sanityCheck and registerSchema