Skip to content

Trigger

Trigger is an abstraction of policies that indicate how often a StreamingQuery should be executed (triggered) and possibly emit a new data.

Trigger is used to determine a TriggerExecutor in MicroBatchExecution and ContinuousExecution.

Trigger TriggerExecutor Factory Method
AvailableNowTrigger MultiBatchExecutor Trigger.AvailableNow
ContinuousTrigger ProcessingTimeExecutor Trigger.Continuous
OneTimeTrigger SingleBatchExecutor Trigger.Once
ProcessingTimeTrigger ProcessingTimeExecutor Trigger.ProcessingTime

Implementations

AvailableNowTrigger

Processes all available data in multiple batches then terminates a query. Best for streaming sources that are SupportsTriggerAvailableNow.

Created by Trigger.AvailableNow

SPARK-36533

AvailableNowTrigger is a new feature in 3.3.0 (tracked under SPARK-36533).

ContinuousTrigger

Continuously processes streaming data, asynchronously checkpointing at the specified interval

OneTimeTrigger

Processes all available data in one batch then terminates the query

ProcessingTimeTrigger

Static Methods

Trigger is also a factory object with static methods to create the policies.

import org.apache.spark.sql.streaming.Trigger

AvailableNow

Trigger AvailableNow()

Creates an AvailableNowTrigger

Supported by SupportsTriggerAvailableNow data sources (e.g., files, kafka and rate-micro-batch)

Continuous

Trigger Continuous(
  Duration interval)
Trigger Continuous(
  long intervalMs)
Trigger Continuous(
  long interval,
  TimeUnit timeUnit)
Trigger Continuous(
  Duration interval)

Creates a ContinuousTrigger

Once

Trigger Once()

Creates a OneTimeTrigger

ProcessingTime

Trigger ProcessingTime(
  Duration interval)
Trigger ProcessingTime(
  long intervalMs)
Trigger ProcessingTime(
  long interval,
  TimeUnit timeUnit)
Trigger ProcessingTime(
  String interval)

Creates a ProcessingTimeTrigger

DataStreamWriter

A Trigger of a streaming query is defined using DataStreamWriter.trigger.

Demo: Trigger.Once

import org.apache.spark.sql.streaming.Trigger
val query = spark.
  readStream.
  format("rate").
  load.
  writeStream.
  format("console").
  option("truncate", false).
  trigger(Trigger.Once). // <-- execute once and stop
  queryName("rate-once").
  start
assert(query.isActive == false)
println(query.lastProgress)
{
  "id" : "2ae4b0a4-434f-4ca7-a523-4e859c07175b",
  "runId" : "24039ce5-906c-4f90-b6e7-bbb3ec38a1f5",
  "name" : "rate-once",
  "timestamp" : "2017-07-04T18:39:35.998Z",
  "numInputRows" : 0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "addBatch" : 1365,
    "getBatch" : 29,
    "getOffset" : 0,
    "queryPlanning" : 285,
    "triggerExecution" : 1742,
    "walCommit" : 40
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "RateSource[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=8]",
    "startOffset" : null,
    "endOffset" : 0,
    "numInputRows" : 0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@7dbf277"
  }
}

IllegalStateException: Custom Triggers Not Supported

Although Trigger allows for custom implementations, StreamExecution refuses such attempts and reports an IllegalStateException.

import org.apache.spark.sql.streaming.Trigger
case object MyTrigger extends Trigger
val sq = spark
  .readStream
  .format("rate")
  .load
  .writeStream
  .format("console")
  .trigger(MyTrigger) // <-- use custom trigger
  .queryName("rate-custom-trigger")
  .start
java.lang.IllegalStateException: Unknown type of trigger: MyTrigger
  at org.apache.spark.sql.execution.streaming.MicroBatchExecution.<init>(MicroBatchExecution.scala:56)
  at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:279)
  at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:326)
  at org.apache.spark.sql.streaming.DataStreamWriter.startQuery(DataStreamWriter.scala:427)
  at org.apache.spark.sql.streaming.DataStreamWriter.startInternal(DataStreamWriter.scala:406)
  at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:249)
  ... 47 elided