HDFSBackedStateStore¶
HDFSBackedStateStore is a concrete StateStore that uses a Hadoop DFS-compatible file system for versioned state persistence.
HDFSBackedStateStore is <HDFSBackedStateStoreProvider is requested for the specified version of state (store) for update (when StateStore utility is requested to look up a StateStore by provider id).
[[id]] HDFSBackedStateStore uses the StateStoreId of the owning HDFSBackedStateStoreProvider.
[[toString]] When requested for the textual representation, HDFSBackedStateStore gives HDFSStateStore[id=(op=[operatorId],part=[partitionId]),dir=[baseDir]].
[[logging]] [TIP] ==== HDFSBackedStateStore is an internal class of HDFSBackedStateStoreProvider and uses its logger. ====
=== [[creating-instance]] Creating HDFSBackedStateStore Instance
HDFSBackedStateStore takes the following to be created:
- [[version]] Version
- [[mapToUpdate]] State Map (
ConcurrentHashMap[UnsafeRow, UnsafeRow])
HDFSBackedStateStore initializes the <
=== [[state]] Internal State -- state Internal Property
[source, scala]¶
state: STATE¶
state is the current state of HDFSBackedStateStore and can be in one of the three possible states: <
State changes (to the internal <HDFSBackedStateStore is in the default <HDFSBackedStateStore transitions to either <
NOTE: Don't get confused with the term "state" as there are two states: the internal <HDFSBackedStateStore and the state of a streaming query (that HDFSBackedStateStore is responsible for).
[[states]] .Internal States [cols="30m,70",options="header",width="100%"] |=== | Name | Description
| ABORTED a| [[ABORTED]] After <
| COMMITTED a| [[COMMITTED]] After <
<HDFSBackedStateStore is in this state or not.
| UPDATING a| [[UPDATING]] (default) Initial state after the HDFSBackedStateStore was <
Allows for state changes (e.g. <
|===
=== [[writeUpdateToDeltaFile]] writeUpdateToDeltaFile Internal Method
[source, scala]¶
writeUpdateToDeltaFile( output: DataOutputStream, key: UnsafeRow, value: UnsafeRow): Unit
CAUTION: FIXME
=== [[put]] put Method
[source, scala]¶
put( key: UnsafeRow, value: UnsafeRow): Unit
NOTE: put is a part of StateStore.md#put[StateStore Contract] to...FIXME
put stores the copies of the key and value in <
put reports an IllegalStateException when HDFSBackedStateStore is not in <
Cannot put after already committed or aborted
=== [[commit]] Committing State Changes -- commit Method
[source, scala]¶
commit(): Long¶
commit is part of the StateStore abstraction.
commit requests the parent HDFSBackedStateStoreProvider to commit state changes (as a new version of state) (with the <
commit transitions HDFSBackedStateStore to <
commit prints out the following INFO message to the logs:
Committed version [newVersion] for [this] to file [finalDeltaFile]
commit returns a <
commit throws an IllegalStateException when HDFSBackedStateStore is not in <
Cannot commit after already committed or aborted
commit throws an IllegalStateException for any NonFatal exception:
Error committing version [newVersion] into [this]
=== [[abort]] Aborting State Changes -- abort Method
[source, scala]¶
abort(): Unit¶
abort is part of the StateStore abstraction.
abort...FIXME
=== [[metrics]] Performance Metrics -- metrics Method
[source, scala]¶
metrics: StateStoreMetrics¶
metrics is part of the StateStore abstraction.
metrics requests the performance metrics of the parent HDFSBackedStateStoreProvider.
The performance metrics of the provider used are only the ones listed in supportedCustomMetrics.
In the end, metrics returns a new StateStoreMetrics with the following:
-
Total number of keys as the size of <
> -
Memory used (in bytes) as the memoryUsedBytes metric (of the parent provider)
-
StateStoreCustomMetrics as the supportedCustomMetrics and the metricStateOnCurrentVersionSizeBytes metric of the parent provider
=== [[hasCommitted]] Are State Changes Committed? -- hasCommitted Method
[source, scala]¶
hasCommitted: Boolean¶
hasCommitted is part of the StateStore abstraction.
hasCommitted returns true when HDFSBackedStateStore is in <false otherwise.
Internal Properties¶
[cols="30m,70",options="header",width="100%"] |=== | Name | Description
| compressedStream a| [[compressedStream]]
[source, scala]¶
compressedStream: DataOutputStream¶
The compressed https://docs.oracle.com/javase/8/docs/api/java/io/DataOutputStream.html[java.io.DataOutputStream] for the <
| deltaFileStream a| [[deltaFileStream]]
[source, scala]¶
deltaFileStream: CheckpointFileManager.CancellableFSDataOutputStream¶
| finalDeltaFile a| [[finalDeltaFile]]
[source, scala]¶
finalDeltaFile: Path¶
The Hadoop https://hadoop.apache.org/docs/r2.7.3/api/org/apache/hadoop/fs/Path.html[Path] of the deltaFile for the version
| newVersion a| [[newVersion]]
[source, scala]¶
newVersion: Long¶
Used exclusively when HDFSBackedStateStore is requested for the <
|===