Skip to content

FileStreamSink

FileStreamSink is a streaming sink that writes data out to files (in a given file format and a directory).

FileStreamSink can only be used with Append output mode.

Creating Instance

FileStreamSink takes the following to be created:

  • SparkSession
  • Path
  • FileFormat
  • Names of the Partition Columns (if any)
  • Options (Map[String, String])

FileStreamSink is created when DataSource is requested to create a streaming sink for FileFormat data sources.

Metadata Log Directory

FileStreamSink uses _spark_metadata directory (under the path) as the Metadata Log Directory to store metadata indicating which files are valid and can be read (and skipping already committed batch).

Metadata Log Directory is managed by FileStreamSinkLog.

Hadoop Path of Metadata Log

logPath: Path

logPath is the location of the Metadata Log (as a Hadoop Path).

FileStreamSinkLog

fileLog: FileStreamSinkLog

fileLog is a FileStreamSinkLog (for the version 1 and the metadata log path)

Used for "adding" batch.

Text Representation

FileStreamSink uses the path for the text representation (toString):

FileSink[path]

"Adding" Batch of Data to Sink

addBatch(
  batchId: Long,
  data: DataFrame): Unit

addBatch requests the FileStreamSinkLog for the latest committed batch ID.

With a newer batchId, addBatch creates a FileCommitProtocol based on spark.sql.streaming.commitProtocolClass configuration property.

The Internals of Apache Spark

Learn more on FileCommitProtocol in The Internals of Apache Spark.

For a ManifestFileCommitProtocol, addBatch requests it to setupManifestOptions (with the FileStreamSinkLog and the given batchId).

In the end, addBatch writes out the data using FileFormatWriter.write workflow (with the FileCommitProtocol and BasicWriteJobStatsTracker).

The Internals of Spark SQL

Learn more on FileFormatWriter in The Internals of Spark SQL.

addBatch prints out the following INFO message to the logs when the given batchId is below the latest committed batch ID:

Skipping already committed batch [batchId]

addBatch is a part of the Sink abstraction.

Creating BasicWriteJobStatsTracker

basicWriteJobStatsTracker: BasicWriteJobStatsTracker

basicWriteJobStatsTracker creates a BasicWriteJobStatsTracker with the basic metrics:

  • number of written files
  • bytes of written output
  • number of output rows
  • number of dynamic partitions

Tip

Learn more about BasicWriteJobStatsTracker in The Internals of Spark SQL online book.

basicWriteJobStatsTracker is used when FileStreamSink is requested to addBatch.

hasMetadata Utility

hasMetadata(
  path: Seq[String],
  hadoopConf: Configuration): Boolean

hasMetadata...FIXME

hasMetadata is used (to short-circut listing files using MetadataLogFileIndex instead of using HDFS API) when:

  • DataSource (Spark SQL) is requested to resolve a FileFormat relation
  • FileTable (Spark SQL) is requested for a PartitioningAwareFileIndex
  • FileStreamSource is requested to fetchAllFiles

Logging

Enable ALL logging level for org.apache.spark.sql.execution.streaming.FileStreamSink logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.execution.streaming.FileStreamSink=ALL

Refer to Logging.