MemoryStream¶
MemoryStream
is a MemoryStreamBase and a MicroBatchStream for Micro-Batch Stream Processing.
Demo¶
import org.apache.spark.sql.execution.streaming.MemoryStream
implicit val sqlContext = spark.sqlContext
val input = MemoryStream[String](numPartitions = 8)
val input2 = MemoryStream[String]
val df1 = input.toDF()
.select($"value", $"value")
val stateFunc: (K, Iterator[V], GroupState[S]) => U): Dataset[U] = ???
val df2 = input2.toDS()
.groupByKey(identity)
.mapGroupsWithState(stateFunc)
.toDF()
input.addData("1", "2", "3", "4", "5")
import org.apache.spark.sql.streaming.Trigger
val sq = input
.toDF()
.writeStream
.format("console")
.trigger(Trigger.AvailableNow)
.start
assert(sq.isActive == false)
Creating Instance¶
MemoryStream
takes the following to be created:
- ID
-
SQLContext
(Spark SQL) - Number of partitions
MemoryStream
is created using apply factory.
Number of Partitions¶
MemoryStream
can be given the number of partitions when created. It is undefined (None
) by default.
When specified, MemoryStream
uses the number of partition in planInputPartitions to redistribute rows into the given number of partition, via round-robin manner.
Creating MemoryStream¶
apply[A : Encoder](
numPartitions: Int)(
implicit sqlContext: SQLContext): MemoryStream[A]
apply[A : Encoder](
implicit sqlContext: SQLContext): MemoryStream[A]
apply
creates a MemoryStream with a unique id (using an internal AtomicInteger
counter).
Input Partitions¶
planInputPartitions(
start: OffsetV2,
end: OffsetV2): Array[InputPartition]
planInputPartitions
is part of the MicroBatchStream abstraction.
planInputPartitions
...FIXME
Adding Data¶
addData(
data: TraversableOnce[A]): Offset
addData
is part of the MemoryStreamBase abstraction.
addData
creates UnsafeRow
s (using toRow
serializer).
addData
prints out the following DEBUG message to the logs:
Adding: [data]
addData
increments currentOffset counter and adds the rows to the batches registry.
In the end, addData
returns the currentOffset.
Logging¶
Enable ALL
logging level for org.apache.spark.sql.execution.streaming.MemoryStream
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.sql.execution.streaming.MemoryStream=ALL
Refer to Logging.