DataStreamWriter¶
DataStreamWriter
is an interface that Spark developers use to describe when the result of executing a streaming query is sent out to a streaming data source.
Accessing DataStreamWriter¶
DataStreamWriter
is available using Dataset.writeStream method.
import org.apache.spark.sql.streaming.DataStreamWriter
import org.apache.spark.sql.Row
val streamingQuery: Dataset[Long] = ...
assert(streamingQuery.isStreaming)
val writer: DataStreamWriter[Row] = streamingQuery.writeStream
Writing to ForeachWriter¶
foreach(
writer: ForeachWriter[T]): DataStreamWriter[T]
Sets ForeachWriter as responsible for streaming writes
Writing Micro-Batches to ForeachBatchSink¶
foreachBatch(
function: (Dataset[T], Long) => Unit): DataStreamWriter[T]
Sets the source as foreachBatch and creates a ForeachBatchSink to be responsible for streaming writes.
SPARK-24565
As per SPARK-24565 Add API for in Structured Streaming for exposing output rows of each microbatch as a DataFrame, the purpose of the method is to expose the micro-batch output as a dataframe for the following:
Pass the output rows of each batch to a library that is designed for the batch jobs only Reuse batch data sources for output whose streaming version does not exist Multi-writes where the output rows are written to multiple outputs by writing twice for every batch
Streaming Sink by Name¶
format(
source: String): DataStreamWriter[T]
Specifies the streaming sink by name (alias)
Output Mode¶
outputMode(
outputMode: OutputMode): DataStreamWriter[T]
outputMode(
outputMode: String): DataStreamWriter[T]
Specifies the OutputMode of the streaming query (what data is sent out to a streaming sink when there is new data available in streaming data sources)
Default: Append
import org.apache.spark.sql.streaming.OutputMode.Update
val inputStream = spark
.readStream
.format("rate")
.load
.writeStream
.format("console")
.outputMode(Update) // <-- update output mode
.start
Partitioning Streaming Writes¶
partitionBy(
colNames: String*): DataStreamWriter[T]
Query Name¶
queryName(
queryName: String): DataStreamWriter[T]
Assigns the name of a query that is just an additional option with the key queryName
.
Starting Streaming Query (Streaming Writes)¶
start(): StreamingQuery
// Explicit `path` (that could also be specified as an option)
start(
path: String): StreamingQuery
Creates and immediately starts a StreamingQuery that is returned as a handle to control the execution of the query
Internally, start
branches off per source
.
memory
foreach
- other formats
...FIXME
start
throws an AnalysisException
for source
to be hive
.
val q = spark.
readStream.
text("server-logs/*").
writeStream.
format("hive") <-- hive format used as a streaming sink
scala> q.start
org.apache.spark.sql.AnalysisException: Hive data source can only be used with tables, you can not write files of Hive data source directly.;
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:234)
... 48 elided
Trigger¶
trigger(
trigger: Trigger): DataStreamWriter[T]
Sets the Trigger for how often the streaming query should be executed
Default: ProcessingTime(0L) that runs a streaming query as often as possible.