MemoryStream¶
MemoryStream is a MemoryStreamBase and a MicroBatchStream for Micro-Batch Stream Processing.
Demo¶
import org.apache.spark.sql.execution.streaming.MemoryStream
implicit val sqlContext = spark.sqlContext
val input = MemoryStream[String](numPartitions = 8)
val input2 = MemoryStream[String]
val df1 = input.toDF()
.select($"value", $"value")
val stateFunc: (K, Iterator[V], GroupState[S]) => U): Dataset[U] = ???
val df2 = input2.toDS()
.groupByKey(identity)
.mapGroupsWithState(stateFunc)
.toDF()
input.addData("1", "2", "3", "4", "5")
import org.apache.spark.sql.streaming.Trigger
val sq = input
.toDF()
.writeStream
.format("console")
.trigger(Trigger.AvailableNow)
.start
assert(sq.isActive == false)
Creating Instance¶
MemoryStream takes the following to be created:
- ID
-
SQLContext(Spark SQL) - Number of partitions
MemoryStream is created using apply factory.
Number of Partitions¶
MemoryStream can be given the number of partitions when created. It is undefined (None) by default.
When specified, MemoryStream uses the number of partition in planInputPartitions to redistribute rows into the given number of partition, via round-robin manner.
Creating MemoryStream¶
apply[A : Encoder](
numPartitions: Int)(
implicit sqlContext: SQLContext): MemoryStream[A]
apply[A : Encoder](
implicit sqlContext: SQLContext): MemoryStream[A]
apply creates a MemoryStream with a unique id (using an internal AtomicInteger counter).
Input Partitions¶
planInputPartitions(
start: OffsetV2,
end: OffsetV2): Array[InputPartition]
planInputPartitions is part of the MicroBatchStream abstraction.
planInputPartitions...FIXME
Adding Data¶
addData(
data: TraversableOnce[A]): Offset
addData is part of the MemoryStreamBase abstraction.
addData creates UnsafeRows (using toRow serializer).
addData prints out the following DEBUG message to the logs:
Adding: [data]
addData increments currentOffset counter and adds the rows to the batches registry.
In the end, addData returns the currentOffset.
Logging¶
Enable ALL logging level for org.apache.spark.sql.execution.streaming.MemoryStream logger to see what happens inside.
Add the following line to conf/log4j.properties:
log4j.logger.org.apache.spark.sql.execution.streaming.MemoryStream=ALL
Refer to Logging.