Skip to content

FileStreamSource

FileStreamSource is a streaming source that reads files (in a given file format) from a directory.

FileStreamSource is used by DataSource.createSource for FileFormat.

Creating Instance

FileStreamSource takes the following to be created:

  • SparkSession
  • Path
  • Class Name of FileFormat
  • Schema
  • Names of the Partition Columns (if any)
  • Metadata Path
  • Options (Map[String, String])

FileStreamSource is created when DataSource is requested to create a streaming source for FileFormat data sources.

While being created, FileStreamSource prints out the following INFO message to the logs (with the maxFilesPerBatch and maxFileAgeMs options):

maxFilesPerBatch = [maxFilesPerBatch], maxFileAgeMs = [maxFileAgeMs]

FileStreamSource requests the FileStreamSourceLog for all files that are added to seenFiles internal registry. FileStreamSource requests the seenFiles internal registry to purge (remove aged entries).

Options

Options are case-insensitive (so cleanSource and CLEANSOURCE are equivalent).

cleanSource

How to clean up completed files.

Available modes:

  • archive
  • delete
  • off

fileNameOnly

Whether to check for new files on on the filename only (true) or the full path (false)

Default: false

When enabled, FileStreamSource prints out the following WARN message to the logs:

'fileNameOnly' is enabled. Make sure your file names are unique (e.g. using UUID), otherwise, files with the same name but under different paths will be considered the same and causes data lost.

latestFirst

Whether to scan latest files first (true) or not (false)

Default: false

When enabled, FileStreamSource prints out the following WARN message to the logs:

'latestFirst' is true. New files will be processed first, which may affect the watermark value. In addition, 'maxFileAge' will be ignored.

maxFileAgeMs

Maximum age of a file that can be found in this directory, before being ignored

Default: 7d

Uses time suffices: us, ms, s, m, min, h, d. No suffix is assumed to be in ms.

maxFilesPerTrigger

Maximum number of files per trigger (batch)

sourceArchiveDir

Archive directory to move completed files to (for cleanSource set to archive)

SupportsAdmissionControl

FileStreamSource is a SupportsAdmissionControl and controls the rate of data ingested.

FileStreamSourceCleaner

FileStreamSource may create a FileStreamSourceCleaner based on cleanSource option.

FileStreamSourceLog

FileStreamSource uses FileStreamSourceLog (for the given metadataPath).

Latest Offset

FileStreamSource tracks the latest offset in metadataLogCurrentOffset internal registry.

Seen Files Registry

seenFiles: SeenFilesMap

seenFiles is...FIXME

seenFiles is used for...FIXME

Committing

commit(
  end: Offset): Unit

commit is...FIXME

commit is part of the Source abstraction.

getDefaultReadLimit

getDefaultReadLimit: ReadLimit

getDefaultReadLimit is...FIXME

getDefaultReadLimit is part of the SupportsAdmissionControl abstraction.

getOffset

getOffset: Option[Offset]

getOffset simply throws an UnsupportedOperationException:

latestOffset(Offset, ReadLimit) should be called instead of this method

getOffset is part of the Source abstraction.

Generating DataFrame for Streaming Batch

getBatch(
  start: Option[Offset],
  end: Offset): DataFrame

getBatch...FIXME

FileStreamSource.getBatch asks <> for the batch.

You should see the following INFO and DEBUG messages in the logs:

Processing ${files.length} files from ${startId + 1}:$endId
Streaming ${files.mkString(", ")}

The method to create a result batch is given at instantiation time (as dataFrameBuilder constructor parameter).

getBatch is part of the Source abstraction.

fetchMaxOffset

fetchMaxOffset(limit: ReadLimit): FileStreamSourceOffset

fetchMaxOffset...FIXME

fetchMaxOffset is used for latestOffset.

fetchAllFiles

fetchAllFiles(): Seq[(String, Long)]

fetchAllFiles...FIXME

fetchAllFiles is used for fetchMaxOffset.

latestOffset

latestOffset(
  startOffset: streaming.Offset,
  limit: ReadLimit): streaming.Offset

latestOffset...FIXME

latestOffset is part of the SparkDataStream abstraction.

Stopping Streaming Source

stop(): Unit

stop...FIXME

stop is part of the SupportsAdmissionControl abstraction.

allFilesUsingInMemoryFileIndex

allFilesUsingInMemoryFileIndex(): Seq[FileStatus]

allFilesUsingInMemoryFileIndex is...FIXME

allFilesUsingInMemoryFileIndex is used for fetchAllFiles.

allFilesUsingMetadataLogFileIndex

allFilesUsingMetadataLogFileIndex(): Seq[FileStatus]

allFilesUsingMetadataLogFileIndex is...FIXME

allFilesUsingMetadataLogFileIndex is used for fetchAllFiles

Logging

Enable ALL logging level for org.apache.spark.sql.execution.streaming.FileStreamSource logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.execution.streaming.FileStreamSource=ALL

Refer to Logging.