Skip to content

Demo: Streaming Query for Running Counts (Socket Source and Complete Output Mode)

The following code shows a streaming aggregation (with Dataset.groupBy operator) in complete output mode that reads text lines from a socket (using socket data source) and outputs running counts of the words.

NOTE: The example is "borrowed" from http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html[the official documentation of Spark]. Changes and errors are only mine.

IMPORTANT: Run nc -lk 9999 first before running the demo.

// START: Only for easier debugging
// Reduce the number of partitions
// The state is then only for one partition
// which should make monitoring easier
val numShufflePartitions = 1
import org.apache.spark.sql.internal.SQLConf.SHUFFLE_PARTITIONS
spark.sessionState.conf.setConf(SHUFFLE_PARTITIONS, numShufflePartitions)

assert(spark.sessionState.conf.numShufflePartitions == numShufflePartitions)
// END: Only for easier debugging

val lines = spark
  .readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load

scala> lines.printSchema
root
 |-- value: string (nullable = true)

import org.apache.spark.sql.functions.explode
val words = lines
  .select(explode(split($"value", """\W+""")) as "word")

val counts = words.groupBy("word").count

scala> counts.printSchema
root
 |-- word: string (nullable = true)
 |-- count: long (nullable = false)

// nc -lk 9999 is supposed to be up at this point

val queryName = "running_counts"
val checkpointLocation = s"/tmp/checkpoint-$queryName"

// Delete the checkpoint location from previous executions
import java.nio.file.{Files, FileSystems}
import java.util.Comparator
import scala.collection.JavaConverters._
val path = FileSystems.getDefault.getPath(checkpointLocation)
if (Files.exists(path)) {
  Files.walk(path)
    .sorted(Comparator.reverseOrder())
    .iterator
    .asScala
    .foreach(p => p.toFile.delete)
}

import org.apache.spark.sql.streaming.OutputMode.Complete
val runningCounts = counts
  .writeStream
  .format("console")
  .option("checkpointLocation", checkpointLocation)
  .outputMode(Complete)
  .start

scala> runningCounts.explain
== Physical Plan ==
WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@42363db7
+- *(5) HashAggregate(keys=[word#72], functions=[count(1)])
   +- StateStoreSave [word#72], state info [ checkpoint = file:/tmp/checkpoint-running_counts/state, runId = f3b2e642-1790-4a17-ab61-3d894110b063, opId = 0, ver = 0, numPartitions = 1], Complete, 0, 2
      +- *(4) HashAggregate(keys=[word#72], functions=[merge_count(1)])
         +- StateStoreRestore [word#72], state info [ checkpoint = file:/tmp/checkpoint-running_counts/state, runId = f3b2e642-1790-4a17-ab61-3d894110b063, opId = 0, ver = 0, numPartitions = 1], 2
            +- *(3) HashAggregate(keys=[word#72], functions=[merge_count(1)])
               +- Exchange hashpartitioning(word#72, 1)
                  +- *(2) HashAggregate(keys=[word#72], functions=[partial_count(1)])
                     +- Generate explode(split(value#83, \W+)), false, [word#72]
                        +- *(1) Project [value#83]
                           +- *(1) ScanV2 socket[value#83] (Options: [host=localhost,port=9999])

// Type lines (words) in the terminal with nc
// Observe the counts in spark-shell

// Use web UI to monitor the state of state (no pun intended)
// StateStoreSave and StateStoreRestore operators all have state metrics
// Go to http://localhost:4040/SQL/ and click one of the Completed Queries with Job IDs

// You may also want to check out checkpointed state
// in /tmp/checkpoint-running_counts/state/0/0

// Eventually...
runningCounts.stop()