Skip to content

MemoryStream

MemoryStream is a MemoryStreamBase and a MicroBatchStream for Micro-Batch Stream Processing.

Demo

import org.apache.spark.sql.execution.streaming.MemoryStream

implicit val sqlContext = spark.sqlContext
val input = MemoryStream[String](numPartitions = 8)
val input2 = MemoryStream[String]

val df1 = input.toDF()
  .select($"value", $"value")

val stateFunc: (K, Iterator[V], GroupState[S]) => U): Dataset[U] = ???
val df2 = input2.toDS()
  .groupByKey(identity)
  .mapGroupsWithState(stateFunc)
  .toDF()
input.addData("1", "2", "3", "4", "5")
import org.apache.spark.sql.streaming.Trigger
val sq = input
  .toDF()
  .writeStream
  .format("console")
  .trigger(Trigger.AvailableNow)
  .start
assert(sq.isActive == false)

Creating Instance

MemoryStream takes the following to be created:

MemoryStream is created using apply factory.

Number of Partitions

MemoryStream can be given the number of partitions when created. It is undefined (None) by default.

When specified, MemoryStream uses the number of partition in planInputPartitions to redistribute rows into the given number of partition, via round-robin manner.

Creating MemoryStream

apply[A : Encoder](
  numPartitions: Int)(
  implicit sqlContext: SQLContext): MemoryStream[A]
apply[A : Encoder](
  implicit sqlContext: SQLContext): MemoryStream[A]

apply creates a MemoryStream with a unique id (using an internal AtomicInteger counter).

Input Partitions

planInputPartitions(
  start: OffsetV2,
  end: OffsetV2): Array[InputPartition]

planInputPartitions is part of the MicroBatchStream abstraction.


planInputPartitions...FIXME

Adding Data

addData(
  data: TraversableOnce[A]): Offset

addData is part of the MemoryStreamBase abstraction.


addData creates UnsafeRows (using toRow serializer).

addData prints out the following DEBUG message to the logs:

Adding: [data]

addData increments currentOffset counter and adds the rows to the batches registry.

In the end, addData returns the currentOffset.

Logging

Enable ALL logging level for org.apache.spark.sql.execution.streaming.MemoryStream logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.execution.streaming.MemoryStream=ALL

Refer to Logging.