Skip to content

Micro-Batch Stream Processing

Micro-Batch Stream Processing is a stream processing model in Spark Structured Streaming that is used for streaming queries with Trigger.Once and Trigger.ProcessingTime triggers.

Micro-batch stream processing uses MicroBatchExecution stream execution engine.

Micro-batch stream processing supports MicroBatchStream data sources.

Micro-batch stream processing is often referred to as Structured Streaming V1.

Execution Phases

When MicroBatchExecution stream processing engine is requested to run an activated streaming query, the query execution goes through the following execution phases every trigger (micro-batch):

  1. triggerExecution
  2. getOffset for Sources or setOffsetRange for...FIXME
  3. getEndOffset
  4. walCommit
  5. getBatch
  6. queryPlanning
  7. addBatch

Execution phases with execution times are available using StreamingQueryProgress under durationMs.

scala> :type sq
org.apache.spark.sql.streaming.StreamingQuery
sq.lastProgress.durationMs.get("walCommit")

Tip

Enable INFO logging level for StreamExecution logger to be notified about durations.

17/08/11 09:04:17 INFO StreamExecution: Streaming query made progress: {
  "id" : "ec8f8228-90f6-4e1f-8ad2-80222affed63",
  "runId" : "f605c134-cfb0-4378-88c1-159d8a7c232e",
  "name" : "rates-to-console",
  "timestamp" : "2017-08-11T07:04:17.373Z",
  "batchId" : 0,
  "numInputRows" : 0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {          // <-- Durations (in millis)
    "addBatch" : 38,
    "getBatch" : 1,
    "getOffset" : 0,
    "queryPlanning" : 1,
    "triggerExecution" : 62,
    "walCommit" : 19
  },

Monitoring

MicroBatchExecution posts events to announce when a streaming query is started and stopped as well as after every micro-batch. StreamingQueryListener interface can be used to intercept the events and act accordingly.

After triggerExecution phase MicroBatchExecution is requested to finish up a streaming batch (trigger) and generate a StreamingQueryProgress (with execution statistics).

MicroBatchExecution prints out the following DEBUG message to the logs:

Execution stats: [executionStats]

MicroBatchExecution posts a QueryProgressEvent with the StreamingQueryProgress and prints out the following INFO message to the logs:

Streaming query made progress: [newProgress]

Demo

import org.apache.spark.sql.streaming.Trigger
import scala.concurrent.duration._
val sq = spark
  .readStream
  .format("rate")
  .load
  .writeStream
  .format("console")
  .option("truncate", false)
  .trigger(Trigger.ProcessingTime(1.minute)) // <-- Uses MicroBatchExecution for execution
  .queryName("rate2console")
  .start
assert(sq.isActive)
scala> sq.explain
== Physical Plan ==
WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@42363db7
+- *(1) Project [timestamp#54, value#55L]
   +- *(1) ScanV2 rate[timestamp#54, value#55L]
sq.stop