Skip to content

MemorySink

MemorySink is a Table (Spark SQL) that SupportsWrite (Spark SQL) with STREAMING_WRITE capability for Memory Data Source.

MemorySink is used for memory format and requires a query name.

Creating Instance

MemorySink takes no arguments to be created.

MemorySink is created when:

Query Name

DataStreamWriter makes sure that the query name of a streaming query with memory sink is specified (or throws an AnalysisException).

The queryName is used as a temporary view to query for data written into.

The queryName can be specified using DataStreamWriter.queryName or queryName write option.

Batches Registry

batches: ArrayBuffer[AddedData]

MemorySink creates an empty ArrayBuffer (Scala) of AddedData when created.

batches is an in-memory buffer of streaming batches.

batches holds data from streaming batches that have been added (written) to this sink:

batches can be cleared (emptied) using clear.

AddedData

case class AddedData(
  batchId: Long,
  data: Array[Row])

MemorySink defines AddedData case class to store rows (data) per batch (batchId) in the batches registry.

The AddedData is used when:

Name

name(): String

name is part of the Table (Spark SQL) abstraction.


name is MemorySink.

Table Capabilities

capabilities(): String

capabilities is part of the Table (Spark SQL) abstraction.


capabilities is STREAMING_WRITE (Spark SQL).

allData

allData: Seq[Row]

allData returns all the rows that were added to the batches registry.


allData is used when:

  • BasicOperators execution planning strategy is executed (to plan a MemoryPlan to LocalTableScanExec physical operator)
  • MemoryPlan is requested for the stats

write

write(
  batchId: Long,
  needTruncate: Boolean,
  newRows: Array[Row]): Unit

write...FIXME


write is used when:

  • MemoryStreamingWrite is requested to commit

latestBatchData

latestBatchData: Seq[Row]

latestBatchData returns the rows in the last element in the batches registry (that are the rows of the last batch).


latestBatchData is intended for tests.

toDebugString

toDebugString: String

toDebugString...FIXME


toDebugString is intended for tests.

Clearing Up Batches

clear(): Unit

clear removes (clears) all data in the batches registry.


clear is intended for tests.

Demo

Creating MemorySink Directly

import org.apache.spark.sql.execution.streaming.sources.MemorySink
val sink = new MemorySink()

Using MemorySink For Testing

val q = df
  .writeStream
  .format("memory")
  .queryName("file_data")
  .start()
  .asInstanceOf[StreamingQueryWrapper]
  .streamingQuery
q.processAllAvailable()
val memorySink = q.sink.asInstanceOf[MemorySink]
memorySink.allData

Logging

Enable ALL logging level for org.apache.spark.sql.execution.streaming.MemorySink logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.execution.streaming.MemorySink=ALL

Refer to Logging