InternalTopologyBuilder¶
InternalTopologyBuilder
uses the nodeFactories internal registry and up to building a ProcessorTopology the nodes are merely names in a "topology namespace" (with no knowledge about real nodes in a topology except their names).
Creating Instance¶
InternalTopologyBuilder
takes the following to be created:
- Topology Name
InternalTopologyBuilder
is created when:
Topology
is created
nodeFactories¶
InternalTopologyBuilder
uses a nodeFactories
internal registry of NodeFactoryies by name.
A new NodeFactory
is added when:
Used when:
- connectProcessorAndStateStore
- findSourcesForProcessorPredecessors
- makeNodeGroups
- build
- setRegexMatchedTopicsToSourceNodes
- isGlobalSource
- describeGlobalStore
- describeSubtopology
Node Groups¶
Map<Integer, Set<String>> nodeGroups
InternalTopologyBuilder
defines nodeGroups
internal registry of subtopologies and an associated group of (source) topics.
nodeGroups
is initially undefined (null
) and is built on demand when undefined that happens after InternalTopologyBuilder
is requested for the following:
Node groups are uniquely identified by node group ID (starting from 0
).
Used when:
makeNodeGroups¶
Map<Integer, Set<String>> makeNodeGroups()
For every node (in the nodeFactories registry) makeNodeGroups
putNodeGroupName.
makeNodeGroups
uses local mutable nodeGroups
and nodeGroupId
values that can be modified every putNodeGroupName.
In the end, makeNodeGroups
returns the nodeGroups
local collection.
putNodeGroupName¶
int putNodeGroupName(
String nodeName,
int nodeGroupId,
Map<Integer, Set<String>> nodeGroups,
Map<String, Set<String>> rootToNodeGroup)
putNodeGroupName
requests the nodeGrouper for the name of the root node of the given nodeName
.
putNodeGroupName
looks up the name of the root node in the given rootToNodeGroup
.
If the node group is found (by the name of the root node), putNodeGroupName
simply adds the given nodeName
and returns the given nodeGroupId
(unchanged).
Otherwise, if the name of the root node is not among the available node groups (in the given rootToNodeGroup
), putNodeGroupName
adds the root name to the given rootToNodeGroup
and nodeGroups
(with an empty node group and a new node group ID).
In the end, putNodeGroupName
returns a new or the given node group ID (based on availability of the root node).
Node Grouper¶
InternalTopologyBuilder
creates a node grouper (QuickUnion<String>
) when created.
The node grouper is requested to add a node name for the following:
The node grouper is requested to unite names (of a node and predecessors) for the following:
In the end, the node grouper is requested for a root node in putNodeGroupName.
Describing Topology¶
TopologyDescription describe()
describe
creates a new TopologyDescription
(that is going to be the returned value in the end).
For every node group describe
checks if the group contains a global (state) source.
If so, describe
describeGlobalStore. Otherwise, describe
describeSubtopology.
describe
is used when:
Topology
is requested to describe
describeGlobalStore¶
void describeGlobalStore(
TopologyDescription description,
Set<String> nodes,
int id)
describeGlobalStore
...FIXME
copartitionSourceGroups¶
List<Set<String>> copartitionSourceGroups
InternalTopologyBuilder
defines copartitionSourceGroups
internal registry of groups of source processors that need to be co-partitioned.
A new entry is added when:
InternalTopologyBuilder
is requested to copartitionSources
The registry is used when InternalTopologyBuilder
is requested for the following:
maybeUpdateCopartitionSourceGroups¶
void maybeUpdateCopartitionSourceGroups(
String replacedNodeName,
String optimizedNodeName)
maybeUpdateCopartitionSourceGroups
...FIXME
maybeUpdateCopartitionSourceGroups
is used when:
InternalStreamsBuilder
is requested to maybeOptimizeRepartitionOperations
copartitionGroups¶
Collection<Set<String>> copartitionGroups()
copartitionGroups
...FIXME
copartitionGroups
is used when:
RepartitionTopics
is requested to setup
copartitionSources¶
void copartitionSources(
Collection<String> sourceNodes)
copartitionSources
simply adds the given sourceNodes
to the copartitionSourceGroups internal registry.
copartitionSources
is used when:
AbstractStream
is requested to ensureCopartitionWithKTableImpl
is requested to doJoinOnForeignKey
internalTopicNamesWithProperties¶
Map<String, InternalTopicProperties> internalTopicNamesWithProperties
InternalTopologyBuilder
defines internalTopicNamesWithProperties
internal registry of all the internal topics with their corresponding properties.
A new internal topic is added when:
InternalTopologyBuilder
is requested to addInternalTopic
The registry is used when:
InternalTopologyBuilder
is requested to validateCopartition, buildSinkNode, buildSourceNode, topicGroups, maybeDecorateInternalSourceTopicsSinkNodeFactory
is requested to build a processor node
addInternalTopic¶
void addInternalTopic(
String topicName,
InternalTopicProperties internalTopicProperties)
addInternalTopic
...FIXME
addInternalTopic
is used when:
KTableImpl
is requested to doJoinOnForeignKeyGroupedTableOperationRepartitionNode
is requested to writeToTopologyOptimizableRepartitionNode
is requested to writeToTopologyUnoptimizableRepartitionNode
is requested to writeToTopology
validateCopartition¶
void validateCopartition()
validateCopartition
...FIXME
validateCopartition
is used when:
InternalStreamsBuilder
is requested to buildAndOptimizeTopology
Global Topics¶
Set<String> globalTopics
InternalTopologyBuilder
tracks global topics (names) in a globalTopics
internal registry.
A new topic name is added in addGlobalStore.
globalTopics
is used when:
isGlobalSource¶
boolean isGlobalSource(
String nodeName)
isGlobalSource
finds a NodeFactory (by given nodeName
) in nodeFactories registry.
isGlobalSource
is positive (true
) when the NodeFactory
is a SourceNodeFactory with one topic only that is global. Otherwise, isGlobalSource
is negative (false
).
isGlobalSource
is used when:
InternalTopologyBuilder
is requested to globalNodeGroups, describeGlobalStore and nodeGroupContainsGlobalSourceNode
Building Processor Topology¶
ProcessorTopology build(
Set<String> nodeGroup)
For every NodeFactory (in the nodeFactories internal registry), if the name of the factory is in the given node group if defined or simply all node factories go through, build
does the following:
- Requests the
NodeFactory
to build a ProcessorNode (and registers it in a local registry of processors by name) - For
ProcessorNodeFactory
s, buildProcessorNode - For
SourceNodeFactory
s, buildSourceNode - For
SinkNodeFactory
s, buildSinkNode
In the end, build
creates a new ProcessorTopology.
build
is used when:
InternalTopologyBuilder
is requested to build a topology, a subtopology and a global state topology
buildProcessorNode¶
void buildProcessorNode(
Map<String, ProcessorNode<?, ?, ?, ?>> processorMap,
Map<String, StateStore> stateStoreMap,
ProcessorNodeFactory<?, ?, ?, ?> factory,
ProcessorNode<Object, Object, Object, Object> node)
buildProcessorNode
...FIXME
Building Source Node¶
void buildSourceNode(
Map<String, SourceNode<?, ?>> topicSourceMap,
Set<String> repartitionTopics,
SourceNodeFactory<?, ?> sourceNodeFactory,
SourceNode<?, ?> node)
buildSourceNode
mutates (changes) the given SourceNode
by topic name (topicSourceMap
) and repartition topic names (repartitionTopics
) collections.
When the pattern (of the given SourceNodeFactory) is defined, buildSourceNode
subscriptionUpdates and requests the SourceNodeFactory
to get the topics. Otherwise, buildSourceNode
requests the SourceNodeFactory
for the topics.
buildSourceNode
adds the topic to the given topicSourceMap
collection.
For internal topics (in internalTopicNamesWithProperties registry), buildSourceNode
decorates the name before adding to the given topicSourceMap
collection and adds them to the given repartitionTopics
collection.
buildSinkNode¶
void buildSinkNode(
Map<String, ProcessorNode<?, ?, ?, ?>> processorMap,
Map<String, SinkNode<?, ?>> topicSinkMap,
Set<String> repartitionTopics,
SinkNodeFactory<?, ?> sinkNodeFactory,
SinkNode<?, ?> node)
buildSinkNode
...FIXME
Building (Local) Processor Topology¶
ProcessorTopology buildTopology()
buildTopology
initializes subscription and then builds a topology (of the node groups without the global node groups).
buildTopology
is used when:
KafkaStreams
is createdTopologyTestDriver
is requested to setupTopology
Building Processor SubTopology¶
ProcessorTopology buildSubtopology(
int topicGroupId)
buildSubtopology
takes the topicGroupId
node group (from the nodeGroups) and builds a topology.
buildSubtopology
is used when:
ActiveTaskCreator
is requested to createTasks and createActiveTaskFromStandbyStandbyTaskCreator
is requested to createTasks and createStandbyTaskFromActive
Building Global State Processor Topology¶
ProcessorTopology buildGlobalStateTopology()
buildGlobalStateTopology
builds a topology of the global node groups if there are any.
buildGlobalStateTopology
assumes that the applicationId has already been set or throws a NullPointerException
:
topology has not completed optimization
buildGlobalStateTopology
is used when:
TopologyMetadata
is requested to buildAndVerifyTopologyTopologyTestDriver
is requested to setupTopology
Rewriting Topology¶
InternalTopologyBuilder rewriteTopology(
StreamsConfig config)
rewriteTopology
setApplicationId to the value of application.id configuration property.
With cache.max.bytes.buffering enabled, rewriteTopology
...FIXME
rewriteTopology
requests the global StoreBuilders to build StateStores.
In the end, rewriteTopology
saves the StreamsConfig (and returns itself).
rewriteTopology
is used when:
KafkaStreams
is createdTopologyTestDriver
is requested to setupTopology
globalNodeGroups¶
Set<String> globalNodeGroups()
globalNodeGroups
collects global source nodes from all the node groups.
globalNodeGroups
is used when:
InternalTopologyBuilder
is requested to build a local (excluding global state nodes) and global state topologies
Registering Global State Store¶
<KIn, VIn> void addGlobalStore(
StoreBuilder<?> storeBuilder,
String sourceName,
TimestampExtractor timestampExtractor,
Deserializer<KIn> keyDeserializer,
Deserializer<VIn> valueDeserializer,
String topic,
String processorName,
ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier)
addGlobalStore
creates a ProcessorNodeFactory.
addGlobalStore
adds the given topic name to globalTopics.
addGlobalStore
creates a SourceNodeFactory and registers it in the nodeFactories under the sourceName
.
addGlobalStore
...FIXME
addGlobalStore
is used when:
Topology
is requested to addGlobalStoreGlobalStoreNode
is requested towriteToTopology
TableSourceNode
is requested to writeToTopology
Registering Processor¶
void addProcessor(
String name,
ProcessorSupplier<KIn, VIn, KOut, VOut> supplier,
String... predecessorNames)
addProcessor
creates a ProcessorNodeFactory (that is then added to nodeFactories registry).
addProcessor
adds the name to nodeGrouper to unite the name with the given predecessorNames
.
addProcessor
is used when:
Topology
is requested to addProcessor- Some
GraphNode
s are requested to writeToTopology
Registering StateStore¶
void addStateStore(
StoreBuilder<?> storeBuilder,
String... processorNames) // (1)
void addStateStore(
StoreBuilder<?> storeBuilder,
boolean allowOverride,
String... processorNames)
- Uses
allowOverride
flag disabled (false
)
addStateStore
...FIXME
addStateStore
is used when:
Topology
is requested to addProcessor and addStateStoreKTableKTableJoinNode
is requested towriteToTopology
StatefulProcessorNode
is requested to writeToTopologyStateStoreNode
is requested towriteToTopology
StreamStreamJoinNode
is requested towriteToTopology
StreamToTableNode
is requested towriteToTopology
TableProcessorNode
is requested towriteToTopology
TableSourceNode
is requested to writeToTopology
topicGroups¶
Map<Subtopology, TopicsInfo> topicGroups()
topicGroups
...FIXME
topicGroups
is used when:
RepartitionTopics
is requested to setupStreamsPartitionAssignor
is requested for consumer group assignment
addSource¶
void addSource(
Topology.AutoOffsetReset offsetReset,
String name,
TimestampExtractor timestampExtractor,
Deserializer<?> keyDeserializer,
Deserializer<?> valDeserializer,
Pattern topicPattern)
void addSource(
Topology.AutoOffsetReset offsetReset,
String name,
TimestampExtractor timestampExtractor,
Deserializer<?> keyDeserializer,
Deserializer<?> valDeserializer,
String... topics)
addSource
creates a new SourceNodeFactory and adds it to the nodeFactories registry (under the given name
).
addSource
adds every topic (in the given topics
) to the sourceTopicNames internal registry.
addSource
registers the given name
with the topics
or the topicPattern
(in the nodeToSourceTopics or the nodeToSourcePatterns registries, respectively).
addSource
adds the given name
to nodeGrouper and clears out the nodeGroups (so it has to be built again next time it is requested).
addSource
is used when:
GroupedTableOperationRepartitionNode
is requested towriteToTopology
OptimizableRepartitionNode
is requested towriteToTopology
StreamSourceNode
is requested to writeToTopologyTableSourceNode
is requested to writeToTopologyTopology
is requested to addSourceUnoptimizableRepartitionNode
is requested towriteToTopology
topologyName¶
topologyName
is specified using setNamedTopology.
setNamedTopology¶
void setNamedTopology(
NamedTopology topology)
setNamedTopology
...FIXME
setNamedTopology
is used when:
NamedTopology
is requested to setTopologyName
Logging¶
Enable ALL
logging level for org.apache.kafka.streams.processor.internals.InternalTopologyBuilder
logger to see what happens inside.
Add the following line to log4j.properties
:
log4j.logger.org.apache.kafka.streams.processor.internals.InternalTopologyBuilder=ALL
Refer to Logging.