FileStreamSink¶
FileStreamSink
is a streaming sink that writes data out to files (in a given file format and a directory).
FileStreamSink
can only be used with Append output mode.
Tip
Learn more in Demo: Deep Dive into FileStreamSink.
Creating Instance¶
FileStreamSink
takes the following to be created:
-
SparkSession
- Path
-
FileFormat
- Names of the Partition Columns (if any)
- Options (
Map[String, String]
)
FileStreamSink
is created when DataSource
is requested to create a streaming sink for FileFormat
data sources.
Metadata Log Directory¶
FileStreamSink
uses _spark_metadata directory (under the path) as the Metadata Log Directory to store metadata indicating which files are valid and can be read (and skipping already committed batch).
Metadata Log Directory is managed by FileStreamSinkLog.
Hadoop Path of Metadata Log¶
logPath: Path
logPath
is the location of the Metadata Log (as a Hadoop Path).
FileStreamSinkLog¶
fileLog: FileStreamSinkLog
fileLog
is a FileStreamSinkLog (for the version 1 and the metadata log path)
Used for "adding" batch.
Text Representation¶
FileStreamSink
uses the path for the text representation (toString
):
FileSink[path]
"Adding" Batch of Data to Sink¶
addBatch(
batchId: Long,
data: DataFrame): Unit
addBatch
requests the FileStreamSinkLog for the latest committed batch ID.
With a newer batchId
, addBatch
creates a FileCommitProtocol
based on spark.sql.streaming.commitProtocolClass configuration property.
The Internals of Apache Spark
Learn more on FileCommitProtocol in The Internals of Apache Spark.
For a ManifestFileCommitProtocol, addBatch
requests it to setupManifestOptions (with the FileStreamSinkLog and the given batchId
).
In the end, addBatch
writes out the data using FileFormatWriter.write
workflow (with the FileCommitProtocol
and BasicWriteJobStatsTracker).
The Internals of Spark SQL
Learn more on FileFormatWriter in The Internals of Spark SQL.
addBatch
prints out the following INFO message to the logs when the given batchId
is below the latest committed batch ID:
Skipping already committed batch [batchId]
addBatch
is a part of the Sink abstraction.
Creating BasicWriteJobStatsTracker¶
basicWriteJobStatsTracker: BasicWriteJobStatsTracker
basicWriteJobStatsTracker
creates a BasicWriteJobStatsTracker
with the basic metrics:
- number of written files
- bytes of written output
- number of output rows
- number of dynamic partitions
Tip
Learn more about BasicWriteJobStatsTracker in The Internals of Spark SQL online book.
basicWriteJobStatsTracker
is used when FileStreamSink
is requested to addBatch.
hasMetadata Utility¶
hasMetadata(
path: Seq[String],
hadoopConf: Configuration): Boolean
hasMetadata
...FIXME
hasMetadata
is used (to short-circut listing files using MetadataLogFileIndex instead of using HDFS API) when:
DataSource
(Spark SQL) is requested to resolve aFileFormat
relationFileTable
(Spark SQL) is requested for aPartitioningAwareFileIndex
FileStreamSource
is requested to fetchAllFiles
Logging¶
Enable ALL
logging level for org.apache.spark.sql.execution.streaming.FileStreamSink
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.sql.execution.streaming.FileStreamSink=ALL
Refer to Logging.