DataStreamWriter¶
DataStreamWriter is a contract for Spark developers to describe a streaming write (i.e., when the result of executing a streaming query is sent out to a streaming data source).
Once a streaming write is described, an execution is started using one of the following operators:
The returned value is a StreamingQuery.
Accessing DataStreamWriter¶
DataStreamWriter is available using Dataset.writeStream method.
import org.apache.spark.sql.streaming.DataStreamWriter
import org.apache.spark.sql.Row
val streamingQuery: Dataset[Long] = ...
assert(streamingQuery.isStreaming)
val writer: DataStreamWriter[Row] = streamingQuery.writeStream
foreach¶
foreach(
writer: ForeachWriter[T]): DataStreamWriter[T]
Sets ForeachWriter as responsible for streaming writes
foreachBatch¶
foreachBatch(
function: (Dataset[T], Long) => Unit): DataStreamWriter[T]
Sets the source as foreachBatch and creates a ForeachBatchSink to be responsible for streaming writes.
SPARK-24565
As per SPARK-24565 Add API for in Structured Streaming for exposing output rows of each microbatch as a DataFrame, the purpose of the method is to expose the micro-batch output as a dataframe for the following:
- Pass the output rows of each batch to a library that is designed for the batch jobs only
- Reuse batch data sources for output whose streaming version does not exist
- Multi-writes where the output rows are written to multiple outputs by writing twice for every batch
Streaming Sink by Name¶
format(
source: String): DataStreamWriter[T]
Specifies the streaming sink by name (alias)
Output Mode¶
outputMode(
outputMode: OutputMode): DataStreamWriter[T]
outputMode(
outputMode: String): DataStreamWriter[T]
Specifies the OutputMode of the streaming query (what data is sent out to a streaming sink when there is new data available in streaming data sources)
Default: Append
import org.apache.spark.sql.streaming.OutputMode.Update
val inputStream = spark
.readStream
.format("rate")
.load
.writeStream
.format("console")
.outputMode(Update) // <-- update output mode
.start
Partitioning Streaming Writes¶
partitionBy(
colNames: String*): DataStreamWriter[T]
Query Name¶
queryName(
queryName: String): DataStreamWriter[T]
Assigns the name of a query that is just an additional option with the key queryName.
Trigger¶
trigger(
trigger: Trigger): DataStreamWriter[T]
Sets the Trigger for how often the streaming query should be executed
Default: ProcessingTime(0L) that runs a streaming query as often as possible.
Start Streaming Query (Streaming Writes)¶
start(): StreamingQuery
// Explicit `path` (that could also be specified as an option)
start(
path: String): StreamingQuery
Public API
start is part of the public API.
Creates and immediately starts a StreamingQuery that is returned as a handle to control the execution of the query
Internally, start branches off per source.
memoryforeach- other formats
FIXME
AnalysisException
start throws an AnalysisException for source to be hive.
val q = spark
.readStream
.text("server-logs/*")
.writeStream
.format("hive") // (1)!
hiveformat used as a streaming sink
scala> q.start
org.apache.spark.sql.AnalysisException: Hive data source can only be used with tables, you can not write files of Hive data source directly.;
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:234)
... 48 elided
toTable¶
toTable(
tableName: String): StreamingQuery
Public API
toTable is part of the public API.
toTable...FIXME
startInternal¶
startInternal(
sink: Table,
newOptions: CaseInsensitiveMap[String],
recoverFromCheckpoint: Boolean = true,
catalogAndIdent: Option[(TableCatalog, Identifier)] = None,
catalogTable: Option[CatalogTable] = None): StreamingQuery
startInternal...FIXME
startInternal is used when:
DataStreamWriteris requested to startInternal and toTable
startQuery¶
startQuery(
sink: Table,
newOptions: CaseInsensitiveMap[String],
recoverFromCheckpoint: Boolean = true,
catalogAndIdent: Option[(TableCatalog, Identifier)] = None,
catalogTable: Option[CatalogTable] = None): StreamingQuery
Assert that sink allowed with RealTimeTrigger
When this Trigger is RealTimeTrigger, startQuery asserts that the given sink is among the allowed sinks (subject to spark.sql.streaming.realTimeMode.allowlistCheck).
startQuery requests the StreamingQueryManager to start a streaming query.
startQuery is used when:
DataStreamWriteris requested to startInternal and toTable