MemorySink¶
MemorySink is a Table (Spark SQL) that SupportsWrite (Spark SQL) with STREAMING_WRITE capability for Memory Data Source.
MemorySink is used for memory format and requires a query name.
Creating Instance¶
MemorySink takes no arguments to be created.
MemorySink is created when:
DataStreamWriteris requested to start a streaming query (withmemorysink)
Query Name¶
DataStreamWriter makes sure that the query name of a streaming query with memory sink is specified (or throws an AnalysisException).
The queryName is used as a temporary view to query for data written into.
The queryName can be specified using DataStreamWriter.queryName or queryName write option.
Batches Registry¶
batches: ArrayBuffer[AddedData]
MemorySink creates an empty ArrayBuffer (Scala) of AddedData when created.
batches is an in-memory buffer of streaming batches.
batches holds data from streaming batches that have been added (written) to this sink:
batches can be cleared (emptied) using clear.
AddedData¶
case class AddedData(
batchId: Long,
data: Array[Row])
MemorySink defines AddedData case class to store rows (data) per batch (batchId) in the batches registry.
The AddedData is used when:
Name¶
name(): String
name is part of the Table (Spark SQL) abstraction.
name is MemorySink.
Table Capabilities¶
capabilities(): String
capabilities is part of the Table (Spark SQL) abstraction.
capabilities is STREAMING_WRITE (Spark SQL).
allData¶
allData: Seq[Row]
allData returns all the rows that were added to the batches registry.
allData is used when:
BasicOperatorsexecution planning strategy is executed (to plan a MemoryPlan toLocalTableScanExecphysical operator)MemoryPlanis requested for the stats
write¶
write(
batchId: Long,
needTruncate: Boolean,
newRows: Array[Row]): Unit
write...FIXME
write is used when:
MemoryStreamingWriteis requested to commit
latestBatchData¶
latestBatchData: Seq[Row]
latestBatchData returns the rows in the last element in the batches registry (that are the rows of the last batch).
latestBatchData is intended for tests.
toDebugString¶
toDebugString: String
toDebugString...FIXME
toDebugString is intended for tests.
Clearing Up Batches¶
clear(): Unit
clear removes (clears) all data in the batches registry.
clear is intended for tests.
Demo¶
Creating MemorySink Directly¶
import org.apache.spark.sql.execution.streaming.sources.MemorySink
val sink = new MemorySink()
Using MemorySink For Testing¶
val q = df
.writeStream
.format("memory")
.queryName("file_data")
.start()
.asInstanceOf[StreamingQueryWrapper]
.streamingQuery
q.processAllAvailable()
val memorySink = q.sink.asInstanceOf[MemorySink]
memorySink.allData
Logging¶
Enable ALL logging level for org.apache.spark.sql.execution.streaming.MemorySink logger to see what happens inside.
Add the following line to conf/log4j.properties:
log4j.logger.org.apache.spark.sql.execution.streaming.MemorySink=ALL
Refer to Logging