Demo: Using File Streaming Source¶
This demo shows a streaming query that reads files using FileStreamSource.
Prerequisites¶
Make sure that the source directory is available before starting the query.
mkdir /tmp/text-logs
Configure Logging¶
Enable logging for FileStreamSource
.
Start Streaming Query¶
Use spark-shell
for fast interactive prototyping.
Describe a source to load data from.
val lines = spark
.readStream
.format("text")
.option("maxFilesPerTrigger", 1)
.load("/tmp/text-logs")
Show the schema.
scala> lines.printSchema
root
|-- value: string (nullable = true)
Describe the sink (console
) and start the streaming query.
import org.apache.spark.sql.streaming.Trigger
import concurrent.duration._
val interval = 15.seconds
val trigger = Trigger.ProcessingTime(interval)
val queryName = s"one file every micro-batch (every $interval)"
val sq = lines
.writeStream
.format("console")
.option("checkpointLocation", "/tmp/checkpointLocation")
.trigger(trigger)
.queryName(queryName)
.start
Use web UI to monitor the query (http://localhost:4040).
Stop Query¶
spark.streams.active.foreach(_.stop)