FileStreamSource¶
FileStreamSource is a streaming source that reads files (in a given file format) from a directory.
FileStreamSource is used by DataSource.createSource for FileFormat.
Tip
Learn more in Demo: Using File Streaming Source.
Creating Instance¶
FileStreamSource takes the following to be created:
-
SparkSession - Path
- Class Name of
FileFormat - Schema
- Names of the Partition Columns (if any)
- Metadata Path
- Options (
Map[String, String])
FileStreamSource is created when DataSource is requested to create a streaming source for FileFormat data sources.
While being created, FileStreamSource prints out the following INFO message to the logs (with the maxFilesPerBatch and maxFileAgeMs options):
maxFilesPerBatch = [maxFilesPerBatch], maxFileAgeMs = [maxFileAgeMs]
FileStreamSource requests the FileStreamSourceLog for all files that are added to seenFiles internal registry. FileStreamSource requests the seenFiles internal registry to purge (remove aged entries).
Options¶
Options are case-insensitive (so cleanSource and CLEANSOURCE are equivalent).
cleanSource¶
How to clean up completed files.
Available modes:
archivedeleteoff
fileNameOnly¶
Whether to check for new files on on the filename only (true) or the full path (false)
Default: false
When enabled, FileStreamSource prints out the following WARN message to the logs:
'fileNameOnly' is enabled. Make sure your file names are unique (e.g. using UUID), otherwise, files with the same name but under different paths will be considered the same and causes data lost.
latestFirst¶
Whether to scan latest files first (true) or not (false)
Default: false
When enabled, FileStreamSource prints out the following WARN message to the logs:
'latestFirst' is true. New files will be processed first, which may affect the watermark value. In addition, 'maxFileAge' will be ignored.
maxFileAgeMs¶
Maximum age of a file that can be found in this directory, before being ignored
Default: 7d
Uses time suffices: us, ms, s, m, min, h, d. No suffix is assumed to be in ms.
maxFilesPerTrigger¶
Maximum number of files per trigger (batch)
sourceArchiveDir¶
Archive directory to move completed files to (for cleanSource set to archive)
SupportsAdmissionControl¶
FileStreamSource is a SupportsAdmissionControl and controls the rate of data ingested.
FileStreamSourceCleaner¶
FileStreamSource may create a FileStreamSourceCleaner based on cleanSource option.
FileStreamSourceLog¶
FileStreamSource uses FileStreamSourceLog (for the given metadataPath).
Latest Offset¶
FileStreamSource tracks the latest offset in metadataLogCurrentOffset internal registry.
Seen Files Registry¶
seenFiles: SeenFilesMap
seenFiles is...FIXME
seenFiles is used for...FIXME
Committing¶
commit(
end: Offset): Unit
commit is...FIXME
commit is part of the Source abstraction.
getDefaultReadLimit¶
getDefaultReadLimit: ReadLimit
getDefaultReadLimit is...FIXME
getDefaultReadLimit is part of the SupportsAdmissionControl abstraction.
getOffset¶
getOffset: Option[Offset]
getOffset simply throws an UnsupportedOperationException:
latestOffset(Offset, ReadLimit) should be called instead of this method
getOffset is part of the Source abstraction.
Generating DataFrame for Streaming Batch¶
getBatch(
start: Option[Offset],
end: Offset): DataFrame
getBatch...FIXME
FileStreamSource.getBatch asks <
You should see the following INFO and DEBUG messages in the logs:
Processing ${files.length} files from ${startId + 1}:$endId
Streaming ${files.mkString(", ")}
The method to create a result batch is given at instantiation time (as dataFrameBuilder constructor parameter).
getBatch is part of the Source abstraction.
fetchMaxOffset¶
fetchMaxOffset(limit: ReadLimit): FileStreamSourceOffset
fetchMaxOffset...FIXME
fetchMaxOffset is used for latestOffset.
fetchAllFiles¶
fetchAllFiles(): Seq[(String, Long)]
fetchAllFiles...FIXME
fetchAllFiles is used for fetchMaxOffset.
latestOffset¶
latestOffset(
startOffset: streaming.Offset,
limit: ReadLimit): streaming.Offset
latestOffset...FIXME
latestOffset is part of the SparkDataStream abstraction.
Stopping Streaming Source¶
stop(): Unit
stop...FIXME
stop is part of the SupportsAdmissionControl abstraction.
allFilesUsingInMemoryFileIndex¶
allFilesUsingInMemoryFileIndex(): Seq[FileStatus]
allFilesUsingInMemoryFileIndex is...FIXME
allFilesUsingInMemoryFileIndex is used for fetchAllFiles.
allFilesUsingMetadataLogFileIndex¶
allFilesUsingMetadataLogFileIndex(): Seq[FileStatus]
allFilesUsingMetadataLogFileIndex is...FIXME
allFilesUsingMetadataLogFileIndex is used for fetchAllFiles
Logging¶
Enable ALL logging level for org.apache.spark.sql.execution.streaming.FileStreamSource logger to see what happens inside.
Add the following line to conf/log4j.properties:
log4j.logger.org.apache.spark.sql.execution.streaming.FileStreamSource=ALL
Refer to Logging.