InternalTopologyBuilder¶
InternalTopologyBuilder uses the nodeFactories internal registry and up to building a ProcessorTopology the nodes are merely names in a "topology namespace" (with no knowledge about real nodes in a topology except their names).
Creating Instance¶
InternalTopologyBuilder takes the following to be created:
- Topology Name
InternalTopologyBuilder is created when:
Topologyis created
nodeFactories¶
InternalTopologyBuilder uses a nodeFactories internal registry of NodeFactoryies by name.
A new NodeFactory is added when:
Used when:
- connectProcessorAndStateStore
- findSourcesForProcessorPredecessors
- makeNodeGroups
- build
- setRegexMatchedTopicsToSourceNodes
- isGlobalSource
- describeGlobalStore
- describeSubtopology
Node Groups¶
Map<Integer, Set<String>> nodeGroups
InternalTopologyBuilder defines nodeGroups internal registry of subtopologies and an associated group of (source) topics.
nodeGroups is initially undefined (null) and is built on demand when undefined that happens after InternalTopologyBuilder is requested for the following:
Node groups are uniquely identified by node group ID (starting from 0).
Used when:
makeNodeGroups¶
Map<Integer, Set<String>> makeNodeGroups()
For every node (in the nodeFactories registry) makeNodeGroups putNodeGroupName.
makeNodeGroups uses local mutable nodeGroups and nodeGroupId values that can be modified every putNodeGroupName.
In the end, makeNodeGroups returns the nodeGroups local collection.
putNodeGroupName¶
int putNodeGroupName(
String nodeName,
int nodeGroupId,
Map<Integer, Set<String>> nodeGroups,
Map<String, Set<String>> rootToNodeGroup)
putNodeGroupName requests the nodeGrouper for the name of the root node of the given nodeName.
putNodeGroupName looks up the name of the root node in the given rootToNodeGroup.
If the node group is found (by the name of the root node), putNodeGroupName simply adds the given nodeName and returns the given nodeGroupId (unchanged).
Otherwise, if the name of the root node is not among the available node groups (in the given rootToNodeGroup), putNodeGroupName adds the root name to the given rootToNodeGroup and nodeGroups (with an empty node group and a new node group ID).
In the end, putNodeGroupName returns a new or the given node group ID (based on availability of the root node).
Node Grouper¶
InternalTopologyBuilder creates a node grouper (QuickUnion<String>) when created.
The node grouper is requested to add a node name for the following:
The node grouper is requested to unite names (of a node and predecessors) for the following:
In the end, the node grouper is requested for a root node in putNodeGroupName.
Describing Topology¶
TopologyDescription describe()
describe creates a new TopologyDescription (that is going to be the returned value in the end).
For every node group describe checks if the group contains a global (state) source.
If so, describe describeGlobalStore. Otherwise, describe describeSubtopology.
describe is used when:
Topologyis requested to describe
describeGlobalStore¶
void describeGlobalStore(
TopologyDescription description,
Set<String> nodes,
int id)
describeGlobalStore...FIXME
copartitionSourceGroups¶
List<Set<String>> copartitionSourceGroups
InternalTopologyBuilder defines copartitionSourceGroups internal registry of groups of source processors that need to be co-partitioned.
A new entry is added when:
InternalTopologyBuilderis requested to copartitionSources
The registry is used when InternalTopologyBuilder is requested for the following:
maybeUpdateCopartitionSourceGroups¶
void maybeUpdateCopartitionSourceGroups(
String replacedNodeName,
String optimizedNodeName)
maybeUpdateCopartitionSourceGroups...FIXME
maybeUpdateCopartitionSourceGroups is used when:
InternalStreamsBuilderis requested to maybeOptimizeRepartitionOperations
copartitionGroups¶
Collection<Set<String>> copartitionGroups()
copartitionGroups...FIXME
copartitionGroups is used when:
RepartitionTopicsis requested to setup
copartitionSources¶
void copartitionSources(
Collection<String> sourceNodes)
copartitionSources simply adds the given sourceNodes to the copartitionSourceGroups internal registry.
copartitionSources is used when:
AbstractStreamis requested to ensureCopartitionWithKTableImplis requested to doJoinOnForeignKey
internalTopicNamesWithProperties¶
Map<String, InternalTopicProperties> internalTopicNamesWithProperties
InternalTopologyBuilder defines internalTopicNamesWithProperties internal registry of all the internal topics with their corresponding properties.
A new internal topic is added when:
InternalTopologyBuilderis requested to addInternalTopic
The registry is used when:
InternalTopologyBuilderis requested to validateCopartition, buildSinkNode, buildSourceNode, topicGroups, maybeDecorateInternalSourceTopicsSinkNodeFactoryis requested to build a processor node
addInternalTopic¶
void addInternalTopic(
String topicName,
InternalTopicProperties internalTopicProperties)
addInternalTopic...FIXME
addInternalTopic is used when:
KTableImplis requested to doJoinOnForeignKeyGroupedTableOperationRepartitionNodeis requested to writeToTopologyOptimizableRepartitionNodeis requested to writeToTopologyUnoptimizableRepartitionNodeis requested to writeToTopology
validateCopartition¶
void validateCopartition()
validateCopartition...FIXME
validateCopartition is used when:
InternalStreamsBuilderis requested to buildAndOptimizeTopology
Global Topics¶
Set<String> globalTopics
InternalTopologyBuilder tracks global topics (names) in a globalTopics internal registry.
A new topic name is added in addGlobalStore.
globalTopics is used when:
isGlobalSource¶
boolean isGlobalSource(
String nodeName)
isGlobalSource finds a NodeFactory (by given nodeName) in nodeFactories registry.
isGlobalSource is positive (true) when the NodeFactory is a SourceNodeFactory with one topic only that is global. Otherwise, isGlobalSource is negative (false).
isGlobalSource is used when:
InternalTopologyBuilderis requested to globalNodeGroups, describeGlobalStore and nodeGroupContainsGlobalSourceNode
Building Processor Topology¶
ProcessorTopology build(
Set<String> nodeGroup)
For every NodeFactory (in the nodeFactories internal registry), if the name of the factory is in the given node group if defined or simply all node factories go through, build does the following:
- Requests the
NodeFactoryto build a ProcessorNode (and registers it in a local registry of processors by name) - For
ProcessorNodeFactorys, buildProcessorNode - For
SourceNodeFactorys, buildSourceNode - For
SinkNodeFactorys, buildSinkNode
In the end, build creates a new ProcessorTopology.
build is used when:
InternalTopologyBuilderis requested to build a topology, a subtopology and a global state topology
buildProcessorNode¶
void buildProcessorNode(
Map<String, ProcessorNode<?, ?, ?, ?>> processorMap,
Map<String, StateStore> stateStoreMap,
ProcessorNodeFactory<?, ?, ?, ?> factory,
ProcessorNode<Object, Object, Object, Object> node)
buildProcessorNode...FIXME
Building Source Node¶
void buildSourceNode(
Map<String, SourceNode<?, ?>> topicSourceMap,
Set<String> repartitionTopics,
SourceNodeFactory<?, ?> sourceNodeFactory,
SourceNode<?, ?> node)
buildSourceNode mutates (changes) the given SourceNode by topic name (topicSourceMap) and repartition topic names (repartitionTopics) collections.
When the pattern (of the given SourceNodeFactory) is defined, buildSourceNode subscriptionUpdates and requests the SourceNodeFactory to get the topics. Otherwise, buildSourceNode requests the SourceNodeFactory for the topics.
buildSourceNode adds the topic to the given topicSourceMap collection.
For internal topics (in internalTopicNamesWithProperties registry), buildSourceNode decorates the name before adding to the given topicSourceMap collection and adds them to the given repartitionTopics collection.
buildSinkNode¶
void buildSinkNode(
Map<String, ProcessorNode<?, ?, ?, ?>> processorMap,
Map<String, SinkNode<?, ?>> topicSinkMap,
Set<String> repartitionTopics,
SinkNodeFactory<?, ?> sinkNodeFactory,
SinkNode<?, ?> node)
buildSinkNode...FIXME
Building (Local) Processor Topology¶
ProcessorTopology buildTopology()
buildTopology initializes subscription and then builds a topology (of the node groups without the global node groups).
buildTopology is used when:
KafkaStreamsis createdTopologyTestDriveris requested to setupTopology
Building Processor SubTopology¶
ProcessorTopology buildSubtopology(
int topicGroupId)
buildSubtopology takes the topicGroupId node group (from the nodeGroups) and builds a topology.
buildSubtopology is used when:
ActiveTaskCreatoris requested to createTasks and createActiveTaskFromStandbyStandbyTaskCreatoris requested to createTasks and createStandbyTaskFromActive
Building Global State Processor Topology¶
ProcessorTopology buildGlobalStateTopology()
buildGlobalStateTopology builds a topology of the global node groups if there are any.
buildGlobalStateTopology assumes that the applicationId has already been set or throws a NullPointerException:
topology has not completed optimization
buildGlobalStateTopology is used when:
TopologyMetadatais requested to buildAndVerifyTopologyTopologyTestDriveris requested to setupTopology
Rewriting Topology¶
InternalTopologyBuilder rewriteTopology(
StreamsConfig config)
rewriteTopology setApplicationId to the value of application.id configuration property.
With cache.max.bytes.buffering enabled, rewriteTopology...FIXME
rewriteTopology requests the global StoreBuilders to build StateStores.
In the end, rewriteTopology saves the StreamsConfig (and returns itself).
rewriteTopology is used when:
KafkaStreamsis createdTopologyTestDriveris requested to setupTopology
globalNodeGroups¶
Set<String> globalNodeGroups()
globalNodeGroups collects global source nodes from all the node groups.
globalNodeGroups is used when:
InternalTopologyBuilderis requested to build a local (excluding global state nodes) and global state topologies
Registering Global State Store¶
<KIn, VIn> void addGlobalStore(
StoreBuilder<?> storeBuilder,
String sourceName,
TimestampExtractor timestampExtractor,
Deserializer<KIn> keyDeserializer,
Deserializer<VIn> valueDeserializer,
String topic,
String processorName,
ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier)
addGlobalStore creates a ProcessorNodeFactory.
addGlobalStore adds the given topic name to globalTopics.
addGlobalStore creates a SourceNodeFactory and registers it in the nodeFactories under the sourceName.
addGlobalStore...FIXME
addGlobalStore is used when:
Topologyis requested to addGlobalStoreGlobalStoreNodeis requested towriteToTopologyTableSourceNodeis requested to writeToTopology
Registering Processor¶
void addProcessor(
String name,
ProcessorSupplier<KIn, VIn, KOut, VOut> supplier,
String... predecessorNames)
addProcessor creates a ProcessorNodeFactory (that is then added to nodeFactories registry).
addProcessor adds the name to nodeGrouper to unite the name with the given predecessorNames.
addProcessor is used when:
Topologyis requested to addProcessor- Some
GraphNodes are requested to writeToTopology
Registering StateStore¶
void addStateStore(
StoreBuilder<?> storeBuilder,
String... processorNames) // (1)
void addStateStore(
StoreBuilder<?> storeBuilder,
boolean allowOverride,
String... processorNames)
- Uses
allowOverrideflag disabled (false)
addStateStore...FIXME
addStateStore is used when:
Topologyis requested to addProcessor and addStateStoreKTableKTableJoinNodeis requested towriteToTopologyStatefulProcessorNodeis requested to writeToTopologyStateStoreNodeis requested towriteToTopologyStreamStreamJoinNodeis requested towriteToTopologyStreamToTableNodeis requested towriteToTopologyTableProcessorNodeis requested towriteToTopologyTableSourceNodeis requested to writeToTopology
topicGroups¶
Map<Subtopology, TopicsInfo> topicGroups()
topicGroups...FIXME
topicGroups is used when:
RepartitionTopicsis requested to setupStreamsPartitionAssignoris requested for consumer group assignment
addSource¶
void addSource(
Topology.AutoOffsetReset offsetReset,
String name,
TimestampExtractor timestampExtractor,
Deserializer<?> keyDeserializer,
Deserializer<?> valDeserializer,
Pattern topicPattern)
void addSource(
Topology.AutoOffsetReset offsetReset,
String name,
TimestampExtractor timestampExtractor,
Deserializer<?> keyDeserializer,
Deserializer<?> valDeserializer,
String... topics)
addSource creates a new SourceNodeFactory and adds it to the nodeFactories registry (under the given name).
addSource adds every topic (in the given topics) to the sourceTopicNames internal registry.
addSource registers the given name with the topics or the topicPattern (in the nodeToSourceTopics or the nodeToSourcePatterns registries, respectively).
addSource adds the given name to nodeGrouper and clears out the nodeGroups (so it has to be built again next time it is requested).
addSource is used when:
GroupedTableOperationRepartitionNodeis requested towriteToTopologyOptimizableRepartitionNodeis requested towriteToTopologyStreamSourceNodeis requested to writeToTopologyTableSourceNodeis requested to writeToTopologyTopologyis requested to addSourceUnoptimizableRepartitionNodeis requested towriteToTopology
topologyName¶
topologyName is specified using setNamedTopology.
setNamedTopology¶
void setNamedTopology(
NamedTopology topology)
setNamedTopology...FIXME
setNamedTopology is used when:
NamedTopologyis requested to setTopologyName
Logging¶
Enable ALL logging level for org.apache.kafka.streams.processor.internals.InternalTopologyBuilder logger to see what happens inside.
Add the following line to log4j.properties:
log4j.logger.org.apache.kafka.streams.processor.internals.InternalTopologyBuilder=ALL
Refer to Logging.