TextSocketSource¶
TextSocketSource
is a streaming source that reads text lines from a socket at the host
and port
.
TextSocketSource
uses lines internal in-memory buffer to keep all of the lines that were read from a socket forever.
Caution
This source is not for production use due to design contraints, e.g. infinite in-memory collection of lines read and no fault recovery.
It is designed only for tutorials and debugging.
Creating Instance¶
When TextSocketSource
is created (see TextSocketSourceProvider), it gets 4 parameters passed in:
host
port
- includeTimestamp flag
- spark-sql-sqlcontext.md[SQLContext]
CAUTION: It appears that the source did not get "renewed" to use spark-sql-sparksession.md[SparkSession] instead.
It opens a socket at given host
and port
parameters and reads a buffering character-input stream using the default charset and the default-sized input buffer (of 8192
bytes) line by line.
CAUTION: FIXME Review Java's Charset.defaultCharset()
It starts a readThread
daemon thread (called TextSocketSource(host, port)
) to read lines from the socket. The lines are added to the internal <
lines Internal Buffer¶
lines: ArrayBuffer[(String, Timestamp)]
lines
is the internal buffer of all the lines TextSocketSource
read from the socket.
=== [[getOffset]] Maximum Available Offset (getOffset method)
TextSocketSource
's offset can either be none or LongOffset
of the number of lines in the internal <
getOffset
is a part of the Source abstraction.
=== [[schema]] Schema (schema method)
TextSocketSource
supports two spark-sql-schema.md[schemas]:
- A single
value
field of String type. value
field ofStringType
type andtimestamp
field of spark-sql-DataType.md#TimestampType[TimestampType] type of formatyyyy-MM-dd HH:mm:ss
.
Tip
Refer to sourceSchema for TextSocketSourceProvider
.
Stopping TextSocketSource¶
When stopped, TextSocketSource
closes the socket connection.
Demo¶
import org.apache.spark.sql.SparkSession
val spark: SparkSession = SparkSession.builder.getOrCreate()
// Connect to localhost:9999
// You can use "nc -lk 9999" for demos
val textSocket = spark.
readStream.
format("socket").
option("host", "localhost").
option("port", 9999).
load
import org.apache.spark.sql.Dataset
val lines: Dataset[String] = textSocket.as[String].map(_.toUpperCase)
val query = lines.writeStream.format("console").start
// Start typing the lines in nc session
// They will appear UPPERCASE in the terminal
-------------------------------------------
Batch: 0
-------------------------------------------
+---------+
| value|
+---------+
|UPPERCASE|
+---------+
scala> query.explain
== Physical Plan ==
*SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#21]
+- *MapElements <function1>, obj#20: java.lang.String
+- *DeserializeToObject value#43.toString, obj#19: java.lang.String
+- LocalTableScan [value#43]
scala> query.stop