Skip to content


SchemaRegisterInjector is an Injector.

SchemaRegisterInjector handles the following Statements:

SchemaRegisterInjector relies on the following overrides (of the SessionConfig of the given ConfiguredStatement):

Creating Instance

SchemaRegisterInjector takes the following to be created:

SchemaRegisterInjector is created when:

Injecting Metadata

<T extends Statement> ConfiguredStatement<T> inject(
  ConfiguredStatement<T> statement)

inject is part of the Injector abstraction.

inject branches off based on the type of the given Statement:

In the end, inject stripSchemaIdConfig.


void registerForCreateAs(
  ConfiguredStatement<? extends CreateAsSelect> cas)


registerForCreateAs is used when:


void registerForCreateSource(
  ConfiguredStatement<? extends CreateSource> cs)

registerForCreateSource takes the CreateSource (from the given ConfiguredStatement) and builds a LogicalSchema (based on TableElements).

registerForCreateSource builds FormatInfos for the keys and values and converts them to Format.

registerForCreateSource builds SerdeFeatures for the values.

registerForCreateSource takes the rawKeySchema and the rawValueSchema from overrides (of the SessionConfig of the given ConfiguredStatement if available).

In the end, registerForCreateSource registerSchemas (unless already registered).

registerForCreateSource is used when:


Format tryGetFormat(
  FormatInfo formatInfo,
  boolean isKey,
  String statementText)



<T extends Statement> ConfiguredStatement<T> stripSchemaIdConfig(
  ConfiguredStatement<T> statement)

stripSchemaIdConfig requests the SessionConfig (of the given ConfiguredStatement) for overrides.

stripSchemaIdConfig removes KEY_SCHEMA_ID and VALUE_SCHEMA_ID from the overrides (and creates a new ConfiguredStatement) if specified. Otherwise, with no KEY_SCHEMA_ID and no VALUE_SCHEMA_ID, stripSchemaIdConfig does nothing and simply returns the given ConfiguredStatement.

stripSchemaIdConfig is used when:

  • SchemaRegisterInjector is requested to inject


void registerSchemas(
  LogicalSchema schema,
  Pair<SchemaAndId, SchemaAndId> kvRawSchema,
  String kafkaTopic,
  FormatInfo keyFormat,
  SerdeFeatures keySerdeFeatures,
  FormatInfo valueFormat,
  SerdeFeatures valueSerdeFeatures,
  KsqlConfig config,
  String statementText,
  boolean registerIfSchemaExists)


registerSchemas is used when:


void registerSchema(
  List<? extends SimpleColumn> schema,
  String topic,
  FormatInfo formatInfo,
  SerdeFeatures serdeFeatures,
  KsqlConfig config,
  String statementText,
  boolean registerIfSchemaExists,
  String subject,
  boolean isKey)



void registerRawSchema(
  SchemaAndId schemaAndId,
  String topic,
  String statementText,
  String subject,
  Boolean isKey)



void sanityCheck(
  SchemaAndId schemaAndId,
  FormatInfo formatInfo,
  String topic,
  KsqlConfig config,
  String statementText,
  boolean isKey)



boolean canRegister(
  Format format,
  KsqlConfig config,
  String topic)

canRegister is true unless the given Format does not support SCHEMA_INFERENCE feature.

canRegister throws a KsqlSchemaRegistryNotConfiguredException unless ksql.schema.registry.url is defined (non-empty):

Cannot create topic '[topic]' with format [name] without configuring 'ksql.schema.registry.url'

canRegister is used when: