MemorySink¶
MemorySink
is a Table
(Spark SQL) that SupportsWrite
(Spark SQL) with STREAMING_WRITE capability for Memory Data Source.
MemorySink
is used for memory
format and requires a query name.
Creating Instance¶
MemorySink
takes no arguments to be created.
MemorySink
is created when:
DataStreamWriter
is requested to start a streaming query (withmemory
sink)
Query Name¶
DataStreamWriter makes sure that the query name of a streaming query with memory
sink is specified (or throws an AnalysisException
).
The queryName
is used as a temporary view to query for data written into.
The queryName
can be specified using DataStreamWriter.queryName or queryName
write option.
Batches Registry¶
batches: ArrayBuffer[AddedData]
MemorySink
creates an empty ArrayBuffer
(Scala) of AddedData when created.
batches
is an in-memory buffer of streaming batches.
batches
holds data from streaming batches that have been added (written) to this sink:
batches
can be cleared (emptied) using clear.
AddedData¶
case class AddedData(
batchId: Long,
data: Array[Row])
MemorySink
defines AddedData
case class to store rows (data) per batch (batchId) in the batches registry.
The AddedData
is used when:
Name¶
name(): String
name
is part of the Table
(Spark SQL) abstraction.
name
is MemorySink
.
Table Capabilities¶
capabilities(): String
capabilities
is part of the Table
(Spark SQL) abstraction.
capabilities
is STREAMING_WRITE
(Spark SQL).
allData¶
allData: Seq[Row]
allData
returns all the rows that were added to the batches registry.
allData
is used when:
BasicOperators
execution planning strategy is executed (to plan a MemoryPlan toLocalTableScanExec
physical operator)MemoryPlan
is requested for the stats
write¶
write(
batchId: Long,
needTruncate: Boolean,
newRows: Array[Row]): Unit
write
...FIXME
write
is used when:
MemoryStreamingWrite
is requested to commit
latestBatchData¶
latestBatchData: Seq[Row]
latestBatchData
returns the rows in the last element in the batches registry (that are the rows of the last batch).
latestBatchData
is intended for tests.
toDebugString¶
toDebugString: String
toDebugString
...FIXME
toDebugString
is intended for tests.
Clearing Up Batches¶
clear(): Unit
clear
removes (clears) all data in the batches registry.
clear
is intended for tests.
Demo¶
Creating MemorySink Directly¶
import org.apache.spark.sql.execution.streaming.sources.MemorySink
val sink = new MemorySink()
Using MemorySink For Testing¶
val q = df
.writeStream
.format("memory")
.queryName("file_data")
.start()
.asInstanceOf[StreamingQueryWrapper]
.streamingQuery
q.processAllAvailable()
val memorySink = q.sink.asInstanceOf[MemorySink]
memorySink.allData
Logging¶
Enable ALL
logging level for org.apache.spark.sql.execution.streaming.MemorySink
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.sql.execution.streaming.MemorySink=ALL
Refer to Logging