CheckpointFileManager¶
CheckpointFileManager
is the <
CheckpointFileManager
is <
CheckpointFileManager
is used exclusively by HDFSMetadataLog, StreamMetadata and HDFSBackedStateStoreProvider.
[[contract]] .CheckpointFileManager Contract [cols="30m,70",options="header",width="100%"] |=== | Method | Description
| createAtomic a| [[createAtomic]]
[source, scala]¶
createAtomic( path: Path, overwriteIfPossible: Boolean): CancellableFSDataOutputStream
Used when:
-
HDFSMetadataLog
is requested to store metadata for a batch (that writeBatchToFile) -
StreamMetadata
helper object is requested to persist metadata -
HDFSBackedStateStore
is requested for the deltaFileStream -
HDFSBackedStateStoreProvider
is requested to writeSnapshotFile
| delete a| [[delete]]
[source, scala]¶
delete(path: Path): Unit¶
Deletes the given path recursively (if exists)
Used when:
-
RenameBasedFSDataOutputStream
is requested tocancel
-
CompactibleFileStreamLog
is requested to store metadata for a batch (that deleteExpiredLog) -
HDFSMetadataLog
is requested to remove expired metadata and purgeAfter -
HDFSBackedStateStoreProvider
is requested to do maintenance (that cleans up)
| exists a| [[exists]]
[source, scala]¶
exists(path: Path): Boolean¶
Used when HDFSMetadataLog
is created (to create the metadata directory) and requested for metadata of a batch
| isLocal a| [[isLocal]]
[source, scala]¶
isLocal: Boolean¶
Does not seem to be used.
| list a| [[list]]
[source, scala]¶
list( path: Path): Array[FileStatus] // <1> list( path: Path, filter: PathFilter): Array[FileStatus]
<1> Uses PathFilter
that accepts all files in the path
Lists all files in the given path
Used when:
-
HDFSBackedStateStoreProvider
is requested for all delta and snapshot files -
CompactibleFileStreamLog
is requested for the compact interval and to deleteExpiredLog -
HDFSMetadataLog
is requested for metadata of one or more batches, the latest committed batch, ordered batch metadata files, to remove expired metadata and purgeAfter
| mkdirs a| [[mkdirs]]
[source, scala]¶
mkdirs(path: Path): Unit¶
Used when:
-
HDFSMetadataLog
is created -
HDFSBackedStateStoreProvider
is requested to initialize
| open a| [[open]]
[source, scala]¶
open(path: Path): FSDataInputStream¶
Opens a file (by the given path) for reading
Used when:
-
HDFSMetadataLog
is requested for metadata of a batch -
HDFSBackedStateStoreProvider
is requested to retrieve the state store for a specified version (that updateFromDeltaFile), and readSnapshotFile
|===
[[implementations]] .CheckpointFileManagers [cols="30,70",options="header",width="100%"] |=== | CheckpointFileManager | Description
| FileContextBasedCheckpointFileManager | [[FileContextBasedCheckpointFileManager]] Default CheckpointFileManager
that uses Hadoop's https://hadoop.apache.org/docs/r2.8.3/api/org/apache/hadoop/fs/FileContext.html[FileContext] API for managing checkpoint files (unless <
| FileSystemBasedCheckpointFileManager | [[FileSystemBasedCheckpointFileManager]] Basic CheckpointFileManager
that uses Hadoop's https://hadoop.apache.org/docs/r2.8.3/api/org/apache/hadoop/fs/FileSystem.html[FileSystem] API for managing checkpoint files (that <FileSystem.rename()
is atomic or the correctness and fault-tolerance of Structured Streaming is not guaranteed)
|===
=== [[create]] Creating CheckpointFileManager Instance -- create
Object Method
[source, scala]¶
create( path: Path, hadoopConf: Configuration): CheckpointFileManager
create
finds spark.sql.streaming.checkpointFileManagerClass configuration property in the hadoopConf
configuration.
If found, create
simply instantiates whatever CheckpointFileManager
implementation is defined.
If not found, create
creates a FileContextBasedCheckpointFileManager.
In case of UnsupportedFileSystemException
, create
prints out the following WARN message to the logs and creates (falls back on) a FileSystemBasedCheckpointFileManager.
Could not use FileContext API for managing Structured Streaming checkpoint files at [path]. Using FileSystem API instead for managing log files. If the implementation of FileSystem.rename() is not atomic, then the correctness and fault-tolerance of your Structured Streaming is not guaranteed.
create
is used when:
-
HDFSMetadataLog is created
-
StreamMetadata
utility is used to write metadata to a file (when StreamExecution is created) -
HDFSBackedStateStoreProvider
is requested for a CheckpointFileManager