Skip to content

Memory Data Source

Memory Data Source is made up of the following two base implementations to support the older DataSource API V1 and the modern DataSource API V2:

Memory data source supports Micro-Batch and Continuous stream processing modes.

[cols="30,35,35",options="header",width="100%"] |=== | Stream Processing | Source | Sink

| Micro-Batch | MemoryStream | MemorySink

| Continuous | ContinuousMemoryStream | MemorySinkV2



Memory Data Source is not for production use due to design contraints, e.g. infinite in-memory collection of lines read and no fault recovery.

MemoryStream is designed primarily for unit tests, tutorials and debugging.

=== [[memory-sink]] Memory Sink

Memory sink requires that a streaming query has a name (defined using DataStreamWriter.queryName or queryName option).

Memory sink may optionally define checkpoint location using checkpointLocation option that is used to recover from for Complete output mode only.

Memory Sink and CreateViewCommand

When a streaming query with memory sink is started, DataStreamWriter uses Dataset.createOrReplaceTempView operator to create or replace a local temporary view with the name of the query (which is required).

Memory Sink and CreateViewCommand


.Memory Source in Micro-Batch Stream Processing [source, scala]

val spark: SparkSession = ???

implicit val ctx = spark.sqlContext

import org.apache.spark.sql.execution.streaming.MemoryStream // It uses two implicits: Encoder[Int] and SQLContext val intsIn = MemoryStream[Int]

val ints = intsIn.toDF .withColumn("t", current_timestamp()) .withWatermark("t", "5 minutes") .groupBy(window($"t", "5 minutes") as "window") .agg(count("*") as "total")

import org.apache.spark.sql.streaming.{OutputMode, Trigger} import scala.concurrent.duration._ val totalsOver5mins = ints. writeStream. format("memory"). queryName("totalsOver5mins"). outputMode(OutputMode.Append). trigger(Trigger.ProcessingTime(10.seconds)). start

val zeroOffset = intsIn.addData(0, 1, 2) totalsOver5mins.processAllAvailable() spark.table("totalsOver5mins").show

scala> +-----+ |value| +-----+ | 0| | 1| | 2| +-----+


.Memory Sink in Micro-Batch Stream Processing [source, scala]

val queryName = "memoryDemo" val sq = spark .readStream .format("rate") .load .writeStream .format("memory") .queryName(queryName) .start

// The name of the streaming query is an in-memory table val showAll = sql(s"select * from $queryName") scala> = false) +-----------------------+-----+ |timestamp |value| +-----------------------+-----+ |2019-10-10 15:19:16.431|42 | |2019-10-10 15:19:17.431|43 | +-----------------------+-----+

import org.apache.spark.sql.streaming.StreamingQuery assert(sq.isInstanceOf[StreamingQuery])

import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper val se = sq.asInstanceOf[StreamingQueryWrapper].streamingQuery

import org.apache.spark.sql.execution.streaming.MemorySink val sink = se.sink.asInstanceOf[MemorySink]

assert(sink.toString == "MemorySink")