Skip to content


MemoryStream is a MemoryStreamBase and a MicroBatchStream for Micro-Batch Stream Processing.


import org.apache.spark.sql.execution.streaming.MemoryStream

implicit val sqlContext = spark.sqlContext
val input = MemoryStream[String](numPartitions = 8)
val input2 = MemoryStream[String]

val df1 = input.toDF()
  .select($"value", $"value")

val stateFunc: (K, Iterator[V], GroupState[S]) => U): Dataset[U] = ???
val df2 = input2.toDS()
input.addData("1", "2", "3", "4", "5")
import org.apache.spark.sql.streaming.Trigger
val sq = input
assert(sq.isActive == false)

Creating Instance

MemoryStream takes the following to be created:

MemoryStream is created using apply factory.

Number of Partitions

MemoryStream can be given the number of partitions when created. It is undefined (None) by default.

When specified, MemoryStream uses the number of partition in planInputPartitions to redistribute rows into the given number of partition, via round-robin manner.

Creating MemoryStream

apply[A : Encoder](
  numPartitions: Int)(
  implicit sqlContext: SQLContext): MemoryStream[A]
apply[A : Encoder](
  implicit sqlContext: SQLContext): MemoryStream[A]

apply creates a MemoryStream with a unique id (using an internal AtomicInteger counter).

Input Partitions

  start: OffsetV2,
  end: OffsetV2): Array[InputPartition]

planInputPartitions is part of the MicroBatchStream abstraction.


Adding Data

  data: TraversableOnce[A]): Offset

addData is part of the MemoryStreamBase abstraction.

addData creates UnsafeRows (using toRow serializer).

addData prints out the following DEBUG message to the logs:

Adding: [data]

addData increments currentOffset counter and adds the rows to the batches registry.

In the end, addData returns the currentOffset.


Enable ALL logging level for org.apache.spark.sql.execution.streaming.MemoryStream logger to see what happens inside.

Add the following line to conf/

Refer to Logging.