RocksDB¶
Creating Instance¶
RocksDB takes the following to be created:
- DFS Root Directory
- RocksDBConf
- Local root directory
- Hadoop
Configuration - Logging ID
RocksDB is created when:
RocksDBStateStoreProvideris requested for the RocksDB
commitLatencyMs¶
commitLatencyMs: HashMap[String, Long]
RocksDB creates an empty commitLatencyMs collection when created.
The following durations are added at the end of committing state changes:
checkpointcompactfileSyncflushpausewriteBatch
commitLatencyMs is used in metrics to create a RocksDBMetrics.
ReadOptions¶
RocksDB creates a ReadOptions (RocksDB) when created.
Used when:
- get
- put (with trackTotalNumberOfRows enabled)
- remove (with trackTotalNumberOfRows enabled)
Closed in close
WriteBatchWithIndex¶
RocksDB creates a WriteBatchWithIndex (RocksDB) (with overwriteKey enabled) when created.
BlockBasedTableConfig¶
RocksDB creates a BlockBasedTableConfig (RocksDB) when created.
BlockBasedTableConfig
BlockBasedTableConfig is the config for plain table sst format. BlockBasedTable is a RocksDB's default SST file format.
The BlockBasedTableConfig is used to create an Options.
Options¶
RocksDB creates an Options (RocksDB) when created.
The Options is used in the following:
Performance Metrics¶
metrics: RocksDBMetrics
metrics reads the following RocksDB properties:
rocksdb.total-sst-files-sizerocksdb.estimate-table-readers-memrocksdb.size-all-mem-tablesrocksdb.block-cache-usage
metrics computes writeBatchMemUsage by requesting the RocksDB WriteBatchWithIndex for WriteBatch to getDataSize.
metrics computes nativeOpsHistograms.
metrics computes nativeOpsMetrics.
In the end, metrics creates a RocksDBMetrics with the following:
metrics is used when:
Statistics¶
nativeStats: Statistics
RocksDB requests Options for a Statistics (RocksDB) to initialize nativeStats when created.
nativeStats is used when:
- Load a version (that can reset the statistics when resetStatsOnLoad is enabled)
- Requested for the metrics (for
getHistogramDataandgetTickerCount)
Retrieving Value for Key¶
get(
key: Array[Byte]): Array[Byte]
get requests the WriteBatchWithIndex to getFromBatchAndDB the given key from the NativeRocksDB (with the ReadOptions).
get is used when:
RocksDBStateStoreis requested to get a value for a key
Committing State Changes¶
commit(): Long
commit builds a new version by incrementing the loadedVersion.
commit creates a new checkpoint--prefixed directory under the localRootDir.
commit prints out the following INFO message to the logs and records the duration of requesting the db to write out the updates (writeTimeMs).
Writing updates for [newVersion]
commit prints out the following INFO message to the logs and records the duration of requesting the db to flush the changes (flushTimeMs).
Flushing updates for [newVersion]
With spark.sql.streaming.stateStore.rocksdb.compactOnCommit enabled, commit prints out the following INFO message to the logs and records the duration of requesting the db to compactRange (compactTimeMs). Otherwise, the compact time is 0.
Compacting
commit prints out the following INFO message to the logs and records the duration of requesting the db to pauseBackgroundWork (pauseTimeMs).
Pausing background work
commit prints out the following INFO message to the logs and records the duration of requesting the db to createCheckpoint in the checkpoint directory (checkpointTimeMs).
Creating checkpoint for [newVersion] in [checkpointDir]
commit prints out the following INFO message to the logs and records the duration of requesting the RocksDBFileManager to saveCheckpointToDfs with the checkpoint directory and the numKeysOnWritingVersion (fileSyncTimeMs).
Syncing checkpoint for [newVersion] to DFS
commit updates the internal registries.
| Internal Registry | New Value |
|---|---|
| numKeysOnLoadedVersion | numKeysOnWritingVersion |
| loadedVersion | newVersion |
| fileManagerMetrics | latestSaveCheckpointMetrics from the RocksDBFileManager |
In the end, commit updates the commitLatencyMs metrics and prints out the following INFO message to the logs:
Committed [newVersion], stats = [metrics]
commit is used when:
RocksDBStateStoreis requested to commit state changes
Logging¶
Enable ALL logging level for org.apache.spark.sql.execution.streaming.state.RocksDB logger to see what happens inside.
Add the following line to conf/log4j.properties:
log4j.logger.org.apache.spark.sql.execution.streaming.state.RocksDB=ALL
Refer to Logging.