FileStreamSource¶
FileStreamSource
is a streaming source that reads files (in a given file format) from a directory.
FileStreamSource
is used by DataSource.createSource for FileFormat
.
Tip
Learn more in Demo: Using File Streaming Source.
Creating Instance¶
FileStreamSource
takes the following to be created:
-
SparkSession
- Path
- Class Name of
FileFormat
- Schema
- Names of the Partition Columns (if any)
- Metadata Path
- Options (
Map[String, String]
)
FileStreamSource
is created when DataSource
is requested to create a streaming source for FileFormat
data sources.
While being created, FileStreamSource
prints out the following INFO message to the logs (with the maxFilesPerBatch and maxFileAgeMs options):
maxFilesPerBatch = [maxFilesPerBatch], maxFileAgeMs = [maxFileAgeMs]
FileStreamSource
requests the FileStreamSourceLog for all files that are added to seenFiles internal registry. FileStreamSource
requests the seenFiles internal registry to purge
(remove aged entries).
Options¶
Options are case-insensitive (so cleanSource
and CLEANSOURCE
are equivalent).
cleanSource¶
How to clean up completed files.
Available modes:
archive
delete
off
fileNameOnly¶
Whether to check for new files on on the filename only (true
) or the full path (false
)
Default: false
When enabled, FileStreamSource
prints out the following WARN message to the logs:
'fileNameOnly' is enabled. Make sure your file names are unique (e.g. using UUID), otherwise, files with the same name but under different paths will be considered the same and causes data lost.
latestFirst¶
Whether to scan latest files first (true
) or not (false
)
Default: false
When enabled, FileStreamSource
prints out the following WARN message to the logs:
'latestFirst' is true. New files will be processed first, which may affect the watermark value. In addition, 'maxFileAge' will be ignored.
maxFileAgeMs¶
Maximum age of a file that can be found in this directory, before being ignored
Default: 7d
Uses time suffices: us
, ms
, s
, m
, min
, h
, d
. No suffix is assumed to be in ms.
maxFilesPerTrigger¶
Maximum number of files per trigger (batch)
sourceArchiveDir¶
Archive directory to move completed files to (for cleanSource set to archive
)
SupportsAdmissionControl¶
FileStreamSource
is a SupportsAdmissionControl and controls the rate of data ingested.
FileStreamSourceCleaner¶
FileStreamSource
may create a FileStreamSourceCleaner based on cleanSource option.
FileStreamSourceLog¶
FileStreamSource
uses FileStreamSourceLog (for the given metadataPath).
Latest Offset¶
FileStreamSource
tracks the latest offset in metadataLogCurrentOffset
internal registry.
Seen Files Registry¶
seenFiles: SeenFilesMap
seenFiles
is...FIXME
seenFiles
is used for...FIXME
Committing¶
commit(
end: Offset): Unit
commit
is...FIXME
commit
is part of the Source abstraction.
getDefaultReadLimit¶
getDefaultReadLimit: ReadLimit
getDefaultReadLimit
is...FIXME
getDefaultReadLimit
is part of the SupportsAdmissionControl abstraction.
getOffset¶
getOffset: Option[Offset]
getOffset
simply throws an UnsupportedOperationException
:
latestOffset(Offset, ReadLimit) should be called instead of this method
getOffset
is part of the Source abstraction.
Generating DataFrame for Streaming Batch¶
getBatch(
start: Option[Offset],
end: Offset): DataFrame
getBatch
...FIXME
FileStreamSource.getBatch
asks <
You should see the following INFO and DEBUG messages in the logs:
Processing ${files.length} files from ${startId + 1}:$endId
Streaming ${files.mkString(", ")}
The method to create a result batch is given at instantiation time (as dataFrameBuilder
constructor parameter).
getBatch
is part of the Source abstraction.
fetchMaxOffset¶
fetchMaxOffset(limit: ReadLimit): FileStreamSourceOffset
fetchMaxOffset
...FIXME
fetchMaxOffset
is used for latestOffset.
fetchAllFiles¶
fetchAllFiles(): Seq[(String, Long)]
fetchAllFiles
...FIXME
fetchAllFiles
is used for fetchMaxOffset.
latestOffset¶
latestOffset(
startOffset: streaming.Offset,
limit: ReadLimit): streaming.Offset
latestOffset
...FIXME
latestOffset
is part of the SparkDataStream abstraction.
Stopping Streaming Source¶
stop(): Unit
stop
...FIXME
stop
is part of the SupportsAdmissionControl abstraction.
allFilesUsingInMemoryFileIndex¶
allFilesUsingInMemoryFileIndex(): Seq[FileStatus]
allFilesUsingInMemoryFileIndex
is...FIXME
allFilesUsingInMemoryFileIndex
is used for fetchAllFiles.
allFilesUsingMetadataLogFileIndex¶
allFilesUsingMetadataLogFileIndex(): Seq[FileStatus]
allFilesUsingMetadataLogFileIndex
is...FIXME
allFilesUsingMetadataLogFileIndex
is used for fetchAllFiles
Logging¶
Enable ALL
logging level for org.apache.spark.sql.execution.streaming.FileStreamSource
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.sql.execution.streaming.FileStreamSource=ALL
Refer to Logging.