Skip to content

KStreamImpl

KStreamImpl is a KStream.

Creating Instance

KStreamImpl takes the following to be created:

KStreamImpl is created when:

  • InternalStreamsBuilder is requested to stream
  • others

repartitionRequired Flag

KStreamImpl is given a repartitionRequired flag when created.

The flag is disabled (false) when:

The repartitionRequired flag is left unchanged (and handed over to the child nodes) in most operators (e.g. filter, mapValues, split).

The flag is enabled (true) when:

The repartitionRequired flag is used in the following operators to add an extra (parent) OptimizableRepartitionNode to the InternalStreamsBuilder:

merge

KStream<K, V> merge(
  InternalStreamsBuilder builder,
  KStream<K, V> stream,
  NamedInternal named)

merge creates a ProcessorGraphNode and turns mergeNode flag on.

merge requests the InternalStreamsBuilder to add the new ProcessorGraphNode (with this and the given KStream's GraphNodes as the parents).

In the end, merge creates a new KStreamImpl for the new ProcessorGraphNode.

merge is part of the KStream abstraction.

join

KStream<K, VR> join(
  GlobalKTable<KG, VG> globalTable,
  ...) // (1)
KStream<K, VR> join(
  KStream<K, VO> otherStream,
  ...)
KStream<K, VR> join(
  KTable<K, VO> table,
  ...)
  1. There are quite a few joins

join...FIXME

join is part of the KStream abstraction.

leftJoin

KStream<K, VR> leftJoin(
  GlobalKTable<KG, VG> globalTable,
  ...) // (1)
KStream<K, VR> leftJoin(
  KStream<K, VO> otherStream,
  ...)
KStream<K, VR> leftJoin(
  KTable<K, VO> table,
  ...)
  1. There are quite a few leftJoins

leftJoin...FIXME

leftJoin is part of the KStream abstraction.

outerJoin

KStream<K, VR> outerJoin(
  KStream<K, VO> otherStream,
  ...) // (1)
  1. There are quite a few outerJoins

outerJoin...FIXME

outerJoin is part of the KStream abstraction.

doJoin

KStream<K, VR> doJoin(
  KStream<K, VO> otherStream,
  ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> joiner,
  JoinWindows windows,
  StreamJoined<K, V, VO> streamJoined,
  KStreamImplJoin join)

In the end, doJoin requests the given KStreamImplJoin to join.

doJoin is used when:

repartitionForJoin

KStreamImpl<K, V> repartitionForJoin(
  String repartitionName,
  Serde<K> keySerdeOverride,
  Serde<V> valueSerdeOverride)

repartitionForJoin creates an OptimizableRepartitionNodeBuilder.

repartitionForJoin creates a repartitioned source.

Only when there is no OptimizableRepartitionNode defined already or the name (of this KStreamImpl) is different from the given repartitionName, repartitionForJoin requests the OptimizableRepartitionNodeBuilder to build an OptimizableRepartitionNode and requests the InternalStreamsBuilder to add the repartition node (to the GraphNode).

In the end, repartitionForJoin creates a new KStreamImpl (with the repartitionRequired flag off and the OptimizableRepartitionNode as the GraphNode).

groupBy

KGroupedStream<KR, V> groupBy(
  KeyValueMapper<? super K, ? super V, KR> keySelector)
KGroupedStream<KR, V> groupBy(
  KeyValueMapper<? super K, ? super V, KR> keySelector,
  Grouped<KR, V> grouped)

groupBy...FIXME

In the end, groupBy creates a KGroupedStreamImpl (with the repartitionRequired flag enabled).

groupBy is part of the KStream abstraction.

groupByKey

KGroupedStream<K, V> groupByKey()
KGroupedStream<K, V> groupByKey(
  Grouped<K, V> grouped)

groupByKey creates a KGroupedStreamImpl.

groupByKey is part of the KStream abstraction.

repartition

KStream<K, V> repartition()
KStream<K, V> repartition(
  Repartitioned<K, V> repartitioned)

repartition doRepartition

repartition is part of the KStream abstraction.

doRepartition

KStream<K, V> doRepartition(
  Repartitioned<K, V> repartitioned)

doRepartition creates a new UnoptimizableRepartitionNodeBuilder that is then used to createRepartitionedSource.

doRepartition requests the UnoptimizableRepartitionNodeBuilder to build a UnoptimizableRepartitionNode.

doRepartition requests the InternalStreamsBuilder to add the UnoptimizableRepartitionNode to the GraphNode (as a child node).

In the end, doRepartition creates a new KStreamImpl (with the repartitionRequired turned off and the UnoptimizableRepartitionNode as the GraphNode).

Creating Repartitioned Source

String createRepartitionedSource(
  InternalStreamsBuilder builder,
  Serde<K1> keySerde,
  Serde<V1> valueSerde,
  String repartitionTopicNamePrefix,
  StreamPartitioner<K1, V1> streamPartitioner,
  BaseRepartitionNodeBuilder<K1, V1, RN> baseRepartitionNodeBuilder)

createRepartitionedSource...FIXME

createRepartitionedSource is used when:

toTable

KTable<K, V> toTable(
  Named named,
  Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized)
KTable<K, V> toTable(...) // (1)
  1. There are other toTables (of less interest)

Only when the repartitionRequired flag is enabled, toTable creates an OptimizableRepartitionNodeBuilder and a repartitioned source. toTable requests the OptimizableRepartitionNodeBuilder to build a (parent) OptimizableRepartitionNode and requests the InternalStreamsBuilder to add the repartition node (to the GraphNode).

toTable...FIXME

toTable is part of the KStream abstraction.

Back to top