RocksDB¶
Creating Instance¶
RocksDB
takes the following to be created:
- DFS Root Directory
- RocksDBConf
- Local root directory
- Hadoop
Configuration
- Logging ID
RocksDB
is created when:
RocksDBStateStoreProvider
is requested for the RocksDB
commitLatencyMs¶
commitLatencyMs: HashMap[String, Long]
RocksDB
creates an empty commitLatencyMs
collection when created.
The following durations are added at the end of committing state changes:
checkpoint
compact
fileSync
flush
pause
writeBatch
commitLatencyMs
is used in metrics to create a RocksDBMetrics.
ReadOptions¶
RocksDB
creates a ReadOptions
(RocksDB) when created.
Used when:
- get
- put (with trackTotalNumberOfRows enabled)
- remove (with trackTotalNumberOfRows enabled)
Closed in close
WriteBatchWithIndex¶
RocksDB
creates a WriteBatchWithIndex
(RocksDB) (with overwriteKey
enabled) when created.
BlockBasedTableConfig¶
RocksDB
creates a BlockBasedTableConfig
(RocksDB) when created.
BlockBasedTableConfig
BlockBasedTableConfig
is the config for plain table sst format. BlockBasedTable
is a RocksDB's default SST file format.
The BlockBasedTableConfig
is used to create an Options.
Options¶
RocksDB
creates an Options
(RocksDB) when created.
The Options
is used in the following:
Performance Metrics¶
metrics: RocksDBMetrics
metrics
reads the following RocksDB properties:
rocksdb.total-sst-files-size
rocksdb.estimate-table-readers-mem
rocksdb.size-all-mem-tables
rocksdb.block-cache-usage
metrics
computes writeBatchMemUsage
by requesting the RocksDB WriteBatchWithIndex for WriteBatch
to getDataSize
.
metrics
computes nativeOpsHistograms.
metrics
computes nativeOpsMetrics.
In the end, metrics
creates a RocksDBMetrics with the following:
metrics
is used when:
Statistics¶
nativeStats: Statistics
RocksDB
requests Options for a Statistics
(RocksDB) to initialize nativeStats
when created.
nativeStats
is used when:
- Load a version (that can reset the statistics when resetStatsOnLoad is enabled)
- Requested for the metrics (for
getHistogramData
andgetTickerCount
)
Retrieving Value for Key¶
get(
key: Array[Byte]): Array[Byte]
get
requests the WriteBatchWithIndex
to getFromBatchAndDB
the given key
from the NativeRocksDB (with the ReadOptions).
get
is used when:
RocksDBStateStore
is requested to get a value for a key
Committing State Changes¶
commit(): Long
commit
builds a new version by incrementing the loadedVersion.
commit
creates a new checkpoint-
-prefixed directory under the localRootDir.
commit
prints out the following INFO message to the logs and records the duration of requesting the db to write out the updates (writeTimeMs
).
Writing updates for [newVersion]
commit
prints out the following INFO message to the logs and records the duration of requesting the db to flush the changes (flushTimeMs
).
Flushing updates for [newVersion]
With spark.sql.streaming.stateStore.rocksdb.compactOnCommit enabled, commit
prints out the following INFO message to the logs and records the duration of requesting the db to compactRange
(compactTimeMs
). Otherwise, the compact time is 0
.
Compacting
commit
prints out the following INFO message to the logs and records the duration of requesting the db to pauseBackgroundWork
(pauseTimeMs
).
Pausing background work
commit
prints out the following INFO message to the logs and records the duration of requesting the db to createCheckpoint
in the checkpoint directory (checkpointTimeMs
).
Creating checkpoint for [newVersion] in [checkpointDir]
commit
prints out the following INFO message to the logs and records the duration of requesting the RocksDBFileManager to saveCheckpointToDfs with the checkpoint directory and the numKeysOnWritingVersion (fileSyncTimeMs
).
Syncing checkpoint for [newVersion] to DFS
commit
updates the internal registries.
Internal Registry | New Value |
---|---|
numKeysOnLoadedVersion | numKeysOnWritingVersion |
loadedVersion | newVersion |
fileManagerMetrics | latestSaveCheckpointMetrics from the RocksDBFileManager |
In the end, commit
updates the commitLatencyMs metrics and prints out the following INFO message to the logs:
Committed [newVersion], stats = [metrics]
commit
is used when:
RocksDBStateStore
is requested to commit state changes
Logging¶
Enable ALL
logging level for org.apache.spark.sql.execution.streaming.state.RocksDB
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.sql.execution.streaming.state.RocksDB=ALL
Refer to Logging.