Trigger¶
Trigger is an abstraction of policies that indicate how often a StreamingQuery should be executed (triggered) and possibly emit a new data.
Trigger is used to determine a TriggerExecutor in MicroBatchExecution and ContinuousExecution.
Implementations¶
AvailableNowTrigger¶
Processes all available data in multiple batches then terminates a query. Best for streaming sources that are SupportsTriggerAvailableNow.
Created by Trigger.AvailableNow
SPARK-36533
AvailableNowTrigger is a new feature in 3.3.0 (tracked under SPARK-36533).
ContinuousTrigger¶
Continuously processes streaming data, asynchronously checkpointing at the specified interval
OneTimeTrigger¶
Processes all available data in one batch then terminates the query
ProcessingTimeTrigger¶
Created by Trigger.ProcessingTime (or directly using ProcessingTimeTrigger.apply)
The only supported Trigger in Continuous Stream Processing
Static Methods¶
Trigger is also a factory object with static methods to create the policies.
import org.apache.spark.sql.streaming.Trigger
AvailableNow¶
Trigger AvailableNow()
Creates an AvailableNowTrigger
Supported by SupportsTriggerAvailableNow data sources (e.g., files, kafka and rate-micro-batch)
Continuous¶
Trigger Continuous(
Duration interval)
Trigger Continuous(
long intervalMs)
Trigger Continuous(
long interval,
TimeUnit timeUnit)
Trigger Continuous(
Duration interval)
Creates a ContinuousTrigger
Once¶
Trigger Once()
Creates a OneTimeTrigger
ProcessingTime¶
Trigger ProcessingTime(
Duration interval)
Trigger ProcessingTime(
long intervalMs)
Trigger ProcessingTime(
long interval,
TimeUnit timeUnit)
Trigger ProcessingTime(
String interval)
Creates a ProcessingTimeTrigger
DataStreamWriter¶
A Trigger of a streaming query is defined using DataStreamWriter.trigger.
Demo: Trigger.Once¶
import org.apache.spark.sql.streaming.Trigger
val query = spark.
readStream.
format("rate").
load.
writeStream.
format("console").
option("truncate", false).
trigger(Trigger.Once). // <-- execute once and stop
queryName("rate-once").
start
assert(query.isActive == false)
println(query.lastProgress)
{
"id" : "2ae4b0a4-434f-4ca7-a523-4e859c07175b",
"runId" : "24039ce5-906c-4f90-b6e7-bbb3ec38a1f5",
"name" : "rate-once",
"timestamp" : "2017-07-04T18:39:35.998Z",
"numInputRows" : 0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"addBatch" : 1365,
"getBatch" : 29,
"getOffset" : 0,
"queryPlanning" : 285,
"triggerExecution" : 1742,
"walCommit" : 40
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "RateSource[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=8]",
"startOffset" : null,
"endOffset" : 0,
"numInputRows" : 0,
"processedRowsPerSecond" : 0.0
} ],
"sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@7dbf277"
}
}
IllegalStateException: Custom Triggers Not Supported¶
Although Trigger allows for custom implementations, StreamExecution refuses such attempts and reports an IllegalStateException.
import org.apache.spark.sql.streaming.Trigger
case object MyTrigger extends Trigger
val sq = spark
.readStream
.format("rate")
.load
.writeStream
.format("console")
.trigger(MyTrigger) // <-- use custom trigger
.queryName("rate-custom-trigger")
.start
java.lang.IllegalStateException: Unknown type of trigger: MyTrigger
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.<init>(MicroBatchExecution.scala:56)
at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:279)
at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:326)
at org.apache.spark.sql.streaming.DataStreamWriter.startQuery(DataStreamWriter.scala:427)
at org.apache.spark.sql.streaming.DataStreamWriter.startInternal(DataStreamWriter.scala:406)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:249)
... 47 elided