Skip to content

DataStreamWriter

DataStreamWriter is a contract for Spark developers to describe a streaming write (i.e., when the result of executing a streaming query is sent out to a streaming data source).

Once a streaming write is described, an execution is started using one of the following operators:

The returned value is a StreamingQuery.

Accessing DataStreamWriter

DataStreamWriter is available using Dataset.writeStream method.

import org.apache.spark.sql.streaming.DataStreamWriter
import org.apache.spark.sql.Row

val streamingQuery: Dataset[Long] = ...

assert(streamingQuery.isStreaming)

val writer: DataStreamWriter[Row] = streamingQuery.writeStream

foreach

foreach(
  writer: ForeachWriter[T]): DataStreamWriter[T]

Sets ForeachWriter as responsible for streaming writes

foreachBatch

foreachBatch(
  function: (Dataset[T], Long) => Unit): DataStreamWriter[T]

Sets the source as foreachBatch and creates a ForeachBatchSink to be responsible for streaming writes.

SPARK-24565

As per SPARK-24565 Add API for in Structured Streaming for exposing output rows of each microbatch as a DataFrame, the purpose of the method is to expose the micro-batch output as a dataframe for the following:

  • Pass the output rows of each batch to a library that is designed for the batch jobs only
  • Reuse batch data sources for output whose streaming version does not exist
  • Multi-writes where the output rows are written to multiple outputs by writing twice for every batch

Streaming Sink by Name

format(
  source: String): DataStreamWriter[T]

Specifies the streaming sink by name (alias)

Output Mode

outputMode(
  outputMode: OutputMode): DataStreamWriter[T]
outputMode(
  outputMode: String): DataStreamWriter[T]

Specifies the OutputMode of the streaming query (what data is sent out to a streaming sink when there is new data available in streaming data sources)

Default: Append

import org.apache.spark.sql.streaming.OutputMode.Update
val inputStream = spark
  .readStream
  .format("rate")
  .load
  .writeStream
  .format("console")
  .outputMode(Update) // <-- update output mode
  .start

Partitioning Streaming Writes

partitionBy(
  colNames: String*): DataStreamWriter[T]

Query Name

queryName(
  queryName: String): DataStreamWriter[T]

Assigns the name of a query that is just an additional option with the key queryName.

Trigger

trigger(
  trigger: Trigger): DataStreamWriter[T]

Sets the Trigger for how often the streaming query should be executed

Default: ProcessingTime(0L) that runs a streaming query as often as possible.

Start Streaming Query (Streaming Writes)

start(): StreamingQuery
// Explicit `path` (that could also be specified as an option)
start(
  path: String): StreamingQuery
Public API

start is part of the public API.

Creates and immediately starts a StreamingQuery that is returned as a handle to control the execution of the query

Internally, start branches off per source.

  • memory
  • foreach
  • other formats

FIXME

AnalysisException

start throws an AnalysisException for source to be hive.

val q = spark
  .readStream
  .text("server-logs/*")
  .writeStream
  .format("hive") // (1)!
  1. hive format used as a streaming sink
scala> q.start
org.apache.spark.sql.AnalysisException: Hive data source can only be used with tables, you can not write files of Hive data source directly.;
  at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:234)
  ... 48 elided

toTable

toTable(
  tableName: String): StreamingQuery
Public API

toTable is part of the public API.

toTable...FIXME

startInternal

startInternal(
  sink: Table,
  newOptions: CaseInsensitiveMap[String],
  recoverFromCheckpoint: Boolean = true,
  catalogAndIdent: Option[(TableCatalog, Identifier)] = None,
  catalogTable: Option[CatalogTable] = None): StreamingQuery

startInternal...FIXME


startInternal is used when:

startQuery

startQuery(
  sink: Table,
  newOptions: CaseInsensitiveMap[String],
  recoverFromCheckpoint: Boolean = true,
  catalogAndIdent: Option[(TableCatalog, Identifier)] = None,
  catalogTable: Option[CatalogTable] = None): StreamingQuery
Assert that sink allowed with RealTimeTrigger

When this Trigger is RealTimeTrigger, startQuery asserts that the given sink is among the allowed sinks (subject to spark.sql.streaming.realTimeMode.allowlistCheck).

startQuery requests the StreamingQueryManager to start a streaming query.


startQuery is used when: