FileStreamSink is a streaming sink that writes data out to files (in a given file format and a directory).
FileStreamSink can only be used with Append output mode.
Learn more in Demo: Deep Dive into FileStreamSink.
FileStreamSink takes the following to be created:
- Names of the Partition Columns (if any)
- Options (
FileStreamSink is created when
DataSource is requested to create a streaming sink for
FileFormat data sources.
Metadata Log Directory¶
FileStreamSink uses _spark_metadata directory (under the path) as the Metadata Log Directory to store metadata indicating which files are valid and can be read (and skipping already committed batch).
Metadata Log Directory is managed by FileStreamSinkLog.
Hadoop Path of Metadata Log¶
logPath is the location of the Metadata Log (as a Hadoop Path).
fileLog is a FileStreamSinkLog (for the version 1 and the metadata log path)
Used for "adding" batch.
FileStreamSink uses the path for the text representation (
"Adding" Batch of Data to Sink¶
addBatch( batchId: Long, data: DataFrame): Unit
addBatch requests the FileStreamSinkLog for the latest committed batch ID.
With a newer
addBatch creates a
FileCommitProtocol based on spark.sql.streaming.commitProtocolClass configuration property.
The Internals of Apache Spark
Learn more on FileCommitProtocol in The Internals of Apache Spark.
For a ManifestFileCommitProtocol,
addBatch requests it to setupManifestOptions (with the FileStreamSinkLog and the given
In the end,
addBatch writes out the data using
FileFormatWriter.write workflow (with the
FileCommitProtocol and BasicWriteJobStatsTracker).
The Internals of Spark SQL
Learn more on FileFormatWriter in The Internals of Spark SQL.
addBatch prints out the following INFO message to the logs when the given
batchId is below the latest committed batch ID:
Skipping already committed batch [batchId]
addBatch is a part of the Sink abstraction.
basicWriteJobStatsTracker creates a
BasicWriteJobStatsTracker with the basic metrics:
- number of written files
- bytes of written output
- number of output rows
- number of dynamic partitions
Learn more about BasicWriteJobStatsTracker in The Internals of Spark SQL online book.
basicWriteJobStatsTracker is used when
FileStreamSink is requested to addBatch.
hasMetadata( path: Seq[String], hadoopConf: Configuration): Boolean
hasMetadata is used (to short-circut listing files using MetadataLogFileIndex instead of using HDFS API) when:
DataSource(Spark SQL) is requested to resolve a
FileTable(Spark SQL) is requested for a
FileStreamSourceis requested to fetchAllFiles
ALL logging level for
org.apache.spark.sql.execution.streaming.FileStreamSink logger to see what happens inside.
Add the following line to
Refer to Logging.