Skip to content

SchemaRegisterInjector

SchemaRegisterInjector is an Injector.

SchemaRegisterInjector handles the following Statements:

SchemaRegisterInjector relies on the following overrides (of the SessionConfig of the given ConfiguredStatement):

Creating Instance

SchemaRegisterInjector takes the following to be created:

SchemaRegisterInjector is created when:

Injecting Metadata

<T extends Statement> ConfiguredStatement<T> inject(
  ConfiguredStatement<T> statement)

inject is part of the Injector abstraction.


inject branches off based on the type of the given Statement:

In the end, inject stripSchemaIdConfig.

registerForCreateAs

void registerForCreateAs(
  ConfiguredStatement<? extends CreateAsSelect> cas)

registerForCreateAs...FIXME


registerForCreateAs is used when:

registerForCreateSource

void registerForCreateSource(
  ConfiguredStatement<? extends CreateSource> cs)

registerForCreateSource takes the CreateSource (from the given ConfiguredStatement) and builds a LogicalSchema (based on TableElements).

registerForCreateSource builds FormatInfos for the keys and values and converts them to Format.

registerForCreateSource builds SerdeFeatures for the values.

registerForCreateSource takes the rawKeySchema and the rawValueSchema from overrides (of the SessionConfig of the given ConfiguredStatement if available).

In the end, registerForCreateSource registerSchemas (unless already registered).


registerForCreateSource is used when:

tryGetFormat

Format tryGetFormat(
  FormatInfo formatInfo,
  boolean isKey,
  String statementText)

tryGetFormat...FIXME

stripSchemaIdConfig

<T extends Statement> ConfiguredStatement<T> stripSchemaIdConfig(
  ConfiguredStatement<T> statement)

stripSchemaIdConfig requests the SessionConfig (of the given ConfiguredStatement) for overrides.

stripSchemaIdConfig removes KEY_SCHEMA_ID and VALUE_SCHEMA_ID from the overrides (and creates a new ConfiguredStatement) if specified. Otherwise, with no KEY_SCHEMA_ID and no VALUE_SCHEMA_ID, stripSchemaIdConfig does nothing and simply returns the given ConfiguredStatement.


stripSchemaIdConfig is used when:

  • SchemaRegisterInjector is requested to inject

registerSchemas

void registerSchemas(
  LogicalSchema schema,
  Pair<SchemaAndId, SchemaAndId> kvRawSchema,
  String kafkaTopic,
  FormatInfo keyFormat,
  SerdeFeatures keySerdeFeatures,
  FormatInfo valueFormat,
  SerdeFeatures valueSerdeFeatures,
  KsqlConfig config,
  String statementText,
  boolean registerIfSchemaExists)

registerSchemas...FIXME


registerSchemas is used when:

registerSchema

void registerSchema(
  List<? extends SimpleColumn> schema,
  String topic,
  FormatInfo formatInfo,
  SerdeFeatures serdeFeatures,
  KsqlConfig config,
  String statementText,
  boolean registerIfSchemaExists,
  String subject,
  boolean isKey)

registerSchema...FIXME

registerRawSchema

void registerRawSchema(
  SchemaAndId schemaAndId,
  String topic,
  String statementText,
  String subject,
  Boolean isKey)

registerRawSchema...FIXME

sanityCheck

void sanityCheck(
  SchemaAndId schemaAndId,
  FormatInfo formatInfo,
  String topic,
  KsqlConfig config,
  String statementText,
  boolean isKey)

sanityCheck...FIXME

canRegister

boolean canRegister(
  Format format,
  KsqlConfig config,
  String topic)

canRegister is true unless the given Format does not support SCHEMA_INFERENCE feature.

canRegister throws a KsqlSchemaRegistryNotConfiguredException unless ksql.schema.registry.url is defined (non-empty):

Cannot create topic '[topic]' with format [name] without configuring 'ksql.schema.registry.url'

canRegister is used when: