HDFSBackedStateStore¶
HDFSBackedStateStore
is a concrete StateStore that uses a Hadoop DFS-compatible file system for versioned state persistence.
HDFSBackedStateStore
is <HDFSBackedStateStoreProvider
is requested for the specified version of state (store) for update (when StateStore
utility is requested to look up a StateStore by provider id).
[[id]] HDFSBackedStateStore
uses the StateStoreId of the owning HDFSBackedStateStoreProvider.
[[toString]] When requested for the textual representation, HDFSBackedStateStore
gives HDFSStateStore[id=(op=[operatorId],part=[partitionId]),dir=[baseDir]].
[[logging]] [TIP] ==== HDFSBackedStateStore
is an internal class of HDFSBackedStateStoreProvider and uses its logger. ====
=== [[creating-instance]] Creating HDFSBackedStateStore Instance
HDFSBackedStateStore
takes the following to be created:
- [[version]] Version
- [[mapToUpdate]] State Map (
ConcurrentHashMap[UnsafeRow, UnsafeRow]
)
HDFSBackedStateStore
initializes the <
=== [[state]] Internal State -- state
Internal Property
[source, scala]¶
state: STATE¶
state
is the current state of HDFSBackedStateStore
and can be in one of the three possible states: <
State changes (to the internal <HDFSBackedStateStore
is in the default <HDFSBackedStateStore
transitions to either <
NOTE: Don't get confused with the term "state" as there are two states: the internal <HDFSBackedStateStore
and the state of a streaming query (that HDFSBackedStateStore
is responsible for).
[[states]] .Internal States [cols="30m,70",options="header",width="100%"] |=== | Name | Description
| ABORTED a| [[ABORTED]] After <
| COMMITTED a| [[COMMITTED]] After <
<HDFSBackedStateStore
is in this state or not.
| UPDATING a| [[UPDATING]] (default) Initial state after the HDFSBackedStateStore
was <
Allows for state changes (e.g. <
|===
=== [[writeUpdateToDeltaFile]] writeUpdateToDeltaFile
Internal Method
[source, scala]¶
writeUpdateToDeltaFile( output: DataOutputStream, key: UnsafeRow, value: UnsafeRow): Unit
CAUTION: FIXME
=== [[put]] put
Method
[source, scala]¶
put( key: UnsafeRow, value: UnsafeRow): Unit
NOTE: put
is a part of StateStore.md#put[StateStore Contract] to...FIXME
put
stores the copies of the key and value in <
put
reports an IllegalStateException
when HDFSBackedStateStore
is not in <
Cannot put after already committed or aborted
=== [[commit]] Committing State Changes -- commit
Method
[source, scala]¶
commit(): Long¶
commit
is part of the StateStore abstraction.
commit
requests the parent HDFSBackedStateStoreProvider
to commit state changes (as a new version of state) (with the <
commit
transitions HDFSBackedStateStore
to <
commit
prints out the following INFO message to the logs:
Committed version [newVersion] for [this] to file [finalDeltaFile]
commit
returns a <
commit
throws an IllegalStateException
when HDFSBackedStateStore
is not in <
Cannot commit after already committed or aborted
commit
throws an IllegalStateException
for any NonFatal
exception:
Error committing version [newVersion] into [this]
=== [[abort]] Aborting State Changes -- abort
Method
[source, scala]¶
abort(): Unit¶
abort
is part of the StateStore abstraction.
abort
...FIXME
=== [[metrics]] Performance Metrics -- metrics
Method
[source, scala]¶
metrics: StateStoreMetrics¶
metrics
is part of the StateStore abstraction.
metrics
requests the performance metrics of the parent HDFSBackedStateStoreProvider
.
The performance metrics of the provider used are only the ones listed in supportedCustomMetrics.
In the end, metrics
returns a new StateStoreMetrics with the following:
-
Total number of keys as the size of <
> -
Memory used (in bytes) as the memoryUsedBytes metric (of the parent provider)
-
StateStoreCustomMetrics as the supportedCustomMetrics and the metricStateOnCurrentVersionSizeBytes metric of the parent provider
=== [[hasCommitted]] Are State Changes Committed? -- hasCommitted
Method
[source, scala]¶
hasCommitted: Boolean¶
hasCommitted
is part of the StateStore abstraction.
hasCommitted
returns true
when HDFSBackedStateStore
is in <false
otherwise.
Internal Properties¶
[cols="30m,70",options="header",width="100%"] |=== | Name | Description
| compressedStream a| [[compressedStream]]
[source, scala]¶
compressedStream: DataOutputStream¶
The compressed https://docs.oracle.com/javase/8/docs/api/java/io/DataOutputStream.html[java.io.DataOutputStream] for the <
| deltaFileStream a| [[deltaFileStream]]
[source, scala]¶
deltaFileStream: CheckpointFileManager.CancellableFSDataOutputStream¶
| finalDeltaFile a| [[finalDeltaFile]]
[source, scala]¶
finalDeltaFile: Path¶
The Hadoop https://hadoop.apache.org/docs/r2.7.3/api/org/apache/hadoop/fs/Path.html[Path] of the deltaFile for the version
| newVersion a| [[newVersion]]
[source, scala]¶
newVersion: Long¶
Used exclusively when HDFSBackedStateStore
is requested for the <
|===