Skip to content

InternalTopologyBuilder

InternalTopologyBuilder uses the nodeFactories internal registry and up to building a ProcessorTopology the nodes are merely names in a "topology namespace" (with no knowledge about real nodes in a topology except their names).

Creating Instance

InternalTopologyBuilder takes the following to be created:

  • Topology Name

InternalTopologyBuilder is created when:

nodeFactories

InternalTopologyBuilder uses a nodeFactories internal registry of NodeFactoryies by name.

A new NodeFactory is added when:

Used when:

Node Groups

Map<Integer, Set<String>> nodeGroups

InternalTopologyBuilder defines nodeGroups internal registry of subtopologies and an associated group of (source) topics.

nodeGroups is initially undefined (null) and is built on demand when undefined that happens after InternalTopologyBuilder is requested for the following:

Node groups are uniquely identified by node group ID (starting from 0).

Used when:

makeNodeGroups

Map<Integer, Set<String>> makeNodeGroups()

For every node (in the nodeFactories registry) makeNodeGroups putNodeGroupName.

makeNodeGroups uses local mutable nodeGroups and nodeGroupId values that can be modified every putNodeGroupName.

In the end, makeNodeGroups returns the nodeGroups local collection.

putNodeGroupName

int putNodeGroupName(
  String nodeName,
  int nodeGroupId,
  Map<Integer, Set<String>> nodeGroups,
  Map<String, Set<String>> rootToNodeGroup)

putNodeGroupName requests the nodeGrouper for the name of the root node of the given nodeName.

putNodeGroupName looks up the name of the root node in the given rootToNodeGroup.

If the node group is found (by the name of the root node), putNodeGroupName simply adds the given nodeName and returns the given nodeGroupId (unchanged).

Otherwise, if the name of the root node is not among the available node groups (in the given rootToNodeGroup), putNodeGroupName adds the root name to the given rootToNodeGroup and nodeGroups (with an empty node group and a new node group ID).

In the end, putNodeGroupName returns a new or the given node group ID (based on availability of the root node).

Node Grouper

InternalTopologyBuilder creates a node grouper (QuickUnion<String>) when created.

The node grouper is requested to add a node name for the following:

The node grouper is requested to unite names (of a node and predecessors) for the following:

In the end, the node grouper is requested for a root node in putNodeGroupName.

Describing Topology

TopologyDescription describe()

describe creates a new TopologyDescription (that is going to be the returned value in the end).

For every node group describe checks if the group contains a global (state) source.

If so, describe describeGlobalStore. Otherwise, describe describeSubtopology.

describe is used when:

describeGlobalStore

void describeGlobalStore(
  TopologyDescription description,
  Set<String> nodes,
  int id)

describeGlobalStore...FIXME

copartitionSourceGroups

List<Set<String>> copartitionSourceGroups

InternalTopologyBuilder defines copartitionSourceGroups internal registry of groups of source processors that need to be co-partitioned.

A new entry is added when:

The registry is used when InternalTopologyBuilder is requested for the following:

maybeUpdateCopartitionSourceGroups

void maybeUpdateCopartitionSourceGroups(
  String replacedNodeName,
  String optimizedNodeName)

maybeUpdateCopartitionSourceGroups...FIXME

maybeUpdateCopartitionSourceGroups is used when:

copartitionGroups

Collection<Set<String>> copartitionGroups()

copartitionGroups...FIXME

copartitionGroups is used when:

  • RepartitionTopics is requested to setup

copartitionSources

void copartitionSources(
  Collection<String> sourceNodes)

copartitionSources simply adds the given sourceNodes to the copartitionSourceGroups internal registry.

copartitionSources is used when:

internalTopicNamesWithProperties

Map<String, InternalTopicProperties> internalTopicNamesWithProperties

InternalTopologyBuilder defines internalTopicNamesWithProperties internal registry of all the internal topics with their corresponding properties.

A new internal topic is added when:

The registry is used when:

addInternalTopic

void addInternalTopic(
  String topicName,
  InternalTopicProperties internalTopicProperties)

addInternalTopic...FIXME

addInternalTopic is used when:

validateCopartition

void validateCopartition()

validateCopartition...FIXME

validateCopartition is used when:

Global Topics

Set<String> globalTopics

InternalTopologyBuilder tracks global topics (names) in a globalTopics internal registry.

A new topic name is added in addGlobalStore.

globalTopics is used when:

isGlobalSource

boolean isGlobalSource(
  String nodeName)

isGlobalSource finds a NodeFactory (by given nodeName) in nodeFactories registry.

isGlobalSource is positive (true) when the NodeFactory is a SourceNodeFactory with one topic only that is global. Otherwise, isGlobalSource is negative (false).

isGlobalSource is used when:

Building Processor Topology

ProcessorTopology build(
  Set<String> nodeGroup)

For every NodeFactory (in the nodeFactories internal registry), if the name of the factory is in the given node group if defined or simply all node factories go through, build does the following:

  1. Requests the NodeFactory to build a ProcessorNode (and registers it in a local registry of processors by name)
  2. For ProcessorNodeFactorys, buildProcessorNode
  3. For SourceNodeFactorys, buildSourceNode
  4. For SinkNodeFactorys, buildSinkNode

In the end, build creates a new ProcessorTopology.

build is used when:

buildProcessorNode

void buildProcessorNode(
  Map<String, ProcessorNode<?, ?, ?, ?>> processorMap,
  Map<String, StateStore> stateStoreMap,
  ProcessorNodeFactory<?, ?, ?, ?> factory,
  ProcessorNode<Object, Object, Object, Object> node)

buildProcessorNode...FIXME

Building Source Node

void buildSourceNode(
  Map<String, SourceNode<?, ?>> topicSourceMap,
  Set<String> repartitionTopics,
  SourceNodeFactory<?, ?> sourceNodeFactory,
  SourceNode<?, ?> node)

buildSourceNode mutates (changes) the given SourceNode by topic name (topicSourceMap) and repartition topic names (repartitionTopics) collections.


When the pattern (of the given SourceNodeFactory) is defined, buildSourceNode subscriptionUpdates and requests the SourceNodeFactory to get the topics. Otherwise, buildSourceNode requests the SourceNodeFactory for the topics.

buildSourceNode adds the topic to the given topicSourceMap collection.

For internal topics (in internalTopicNamesWithProperties registry), buildSourceNode decorates the name before adding to the given topicSourceMap collection and adds them to the given repartitionTopics collection.

buildSinkNode

void buildSinkNode(
  Map<String, ProcessorNode<?, ?, ?, ?>> processorMap,
  Map<String, SinkNode<?, ?>> topicSinkMap,
  Set<String> repartitionTopics,
  SinkNodeFactory<?, ?> sinkNodeFactory,
  SinkNode<?, ?> node)

buildSinkNode...FIXME

Building (Local) Processor Topology

ProcessorTopology buildTopology()

buildTopology initializes subscription and then builds a topology (of the node groups without the global node groups).

buildTopology is used when:

Building Processor SubTopology

ProcessorTopology buildSubtopology(
  int topicGroupId)

buildSubtopology takes the topicGroupId node group (from the nodeGroups) and builds a topology.

buildSubtopology is used when:

Building Global State Processor Topology

ProcessorTopology buildGlobalStateTopology()

buildGlobalStateTopology builds a topology of the global node groups if there are any.

buildGlobalStateTopology assumes that the applicationId has already been set or throws a NullPointerException:

topology has not completed optimization

buildGlobalStateTopology is used when:

Rewriting Topology

InternalTopologyBuilder rewriteTopology(
  StreamsConfig config)

rewriteTopology setApplicationId to the value of application.id configuration property.

With cache.max.bytes.buffering enabled, rewriteTopology...FIXME

rewriteTopology requests the global StoreBuilders to build StateStores.

In the end, rewriteTopology saves the StreamsConfig (and returns itself).

rewriteTopology is used when:

globalNodeGroups

Set<String> globalNodeGroups()

globalNodeGroups collects global source nodes from all the node groups.

globalNodeGroups is used when:

  • InternalTopologyBuilder is requested to build a local (excluding global state nodes) and global state topologies

Registering Global State Store

<KIn, VIn> void addGlobalStore(
  StoreBuilder<?> storeBuilder,
  String sourceName,
  TimestampExtractor timestampExtractor,
  Deserializer<KIn> keyDeserializer,
  Deserializer<VIn> valueDeserializer,
  String topic,
  String processorName,
  ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier)

addGlobalStore creates a ProcessorNodeFactory.

addGlobalStore adds the given topic name to globalTopics.

addGlobalStore creates a SourceNodeFactory and registers it in the nodeFactories under the sourceName.

addGlobalStore...FIXME

addGlobalStore is used when:

Registering Processor

void addProcessor(
  String name,
  ProcessorSupplier<KIn, VIn, KOut, VOut> supplier,
  String... predecessorNames)

addProcessor creates a ProcessorNodeFactory (that is then added to nodeFactories registry).

addProcessor adds the name to nodeGrouper to unite the name with the given predecessorNames.

addProcessor is used when:

Registering StateStore

void addStateStore(
  StoreBuilder<?> storeBuilder,
  String... processorNames) // (1)
void addStateStore(
  StoreBuilder<?> storeBuilder,
  boolean allowOverride,
  String... processorNames)
  1. Uses allowOverride flag disabled (false)

addStateStore...FIXME

addStateStore is used when:

  • Topology is requested to addProcessor and addStateStore
  • KTableKTableJoinNode is requested to writeToTopology
  • StatefulProcessorNode is requested to writeToTopology
  • StateStoreNode is requested to writeToTopology
  • StreamStreamJoinNode is requested to writeToTopology
  • StreamToTableNode is requested to writeToTopology
  • TableProcessorNode is requested to writeToTopology
  • TableSourceNode is requested to writeToTopology

topicGroups

Map<Subtopology, TopicsInfo> topicGroups()

topicGroups...FIXME

topicGroups is used when:

addSource

void addSource(
  Topology.AutoOffsetReset offsetReset,
  String name,
  TimestampExtractor timestampExtractor,
  Deserializer<?> keyDeserializer,
  Deserializer<?> valDeserializer,
  Pattern topicPattern)
void addSource(
  Topology.AutoOffsetReset offsetReset,
  String name,
  TimestampExtractor timestampExtractor,
  Deserializer<?> keyDeserializer,
  Deserializer<?> valDeserializer,
  String... topics)

addSource creates a new SourceNodeFactory and adds it to the nodeFactories registry (under the given name).

addSource adds every topic (in the given topics) to the sourceTopicNames internal registry.

addSource registers the given name with the topics or the topicPattern (in the nodeToSourceTopics or the nodeToSourcePatterns registries, respectively).

addSource adds the given name to nodeGrouper and clears out the nodeGroups (so it has to be built again next time it is requested).

addSource is used when:

  • GroupedTableOperationRepartitionNode is requested to writeToTopology
  • OptimizableRepartitionNode is requested to writeToTopology
  • StreamSourceNode is requested to writeToTopology
  • TableSourceNode is requested to writeToTopology
  • Topology is requested to addSource
  • UnoptimizableRepartitionNode is requested to writeToTopology

topologyName

topologyName is specified using setNamedTopology.

setNamedTopology

void setNamedTopology(
  NamedTopology topology)

setNamedTopology...FIXME

setNamedTopology is used when:

Logging

Enable ALL logging level for org.apache.kafka.streams.processor.internals.InternalTopologyBuilder logger to see what happens inside.

Add the following line to log4j.properties:

log4j.logger.org.apache.kafka.streams.processor.internals.InternalTopologyBuilder=ALL

Refer to Logging.

Back to top