ForeachBatchSink¶
ForeachBatchSink is a streaming sink that represents DataStreamWriter.foreachBatch streaming operator at runtime.
Type Constructor
ForeachBatchSink[T] is a Scala type constructor with the type parameter T.
ForeachBatchSink was added in Spark 2.4.0 as part of SPARK-24565 Add API for in Structured Streaming for exposing output rows of each microbatch as a DataFrame.
import org.apache.spark.sql.Dataset
val q = spark.readStream
.format("rate")
.load
.writeStream
.foreachBatch { (output: Dataset[_], batchId: Long) => // <-- creates a ForeachBatchSink
println(s"Batch ID: $batchId")
output.show
}
.start
// q.stop
scala> println(q.lastProgress.sink.description)
ForeachBatchSink
Creating Instance¶
ForeachBatchSink takes the following when created:
- Batch Writer Function (
(Dataset[T], Long) => Unit) - Encoder of type
T(ExpressionEncoder[T])
ForeachBatchSink is created when DataStreamWriter is requested to start execution of the streaming query (with the foreachBatch source) for DataStreamWriter.foreachBatch streaming operator.
Adding Batch¶
addBatch(
batchId: Long,
data: DataFrame): Unit
addBatch requests the encoder to resolveAndBind (using the output of the analyzed logical plan of the given DataFrame) that creates a "resolved" encoder. addBatch requests the resolved encoder to create an Deserializer (to convert a Spark SQL Row objects into objects of type T).
addBatch requests the QueryExecution (of the given DataFrame) for RDD[InternalRow] (executes the query plan) and applies map operator to convert rows to Scala objects.
Important
At this point the "old" DataFrame is no longer a DataFrame but an RDD[InternalRow]. One of the "side-effects" is that whatever logical and physical optimizations may have been applied to the given DataFrame it is over now.
addBatch creates a new Dataset (for the RDD) and executes batchWriter function (passing the Dataset and the batchId).
addBatch is a part of the Sink abstraction.
Text Representation¶
ForeachBatchSink uses ForeachBatchSink name.