Skip to content

RocksDB

Creating Instance

RocksDB takes the following to be created:

  • DFS Root Directory
  • RocksDBConf
  • Local root directory
  • Hadoop Configuration
  • Logging ID

RocksDB is created when:

  • RocksDBStateStoreProvider is requested for the RocksDB

commitLatencyMs

commitLatencyMs: HashMap[String, Long]

RocksDB creates an empty commitLatencyMs collection when created.

The following durations are added at the end of committing state changes:

  • checkpoint
  • compact
  • fileSync
  • flush
  • pause
  • writeBatch

commitLatencyMs is used in metrics to create a RocksDBMetrics.

ReadOptions

RocksDB creates a ReadOptions (RocksDB) when created.

Used when:

Closed in close

WriteBatchWithIndex

RocksDB creates a WriteBatchWithIndex (RocksDB) (with overwriteKey enabled) when created.

BlockBasedTableConfig

RocksDB creates a BlockBasedTableConfig (RocksDB) when created.

BlockBasedTableConfig

BlockBasedTableConfig is the config for plain table sst format. BlockBasedTable is a RocksDB's default SST file format.

The BlockBasedTableConfig is used to create an Options.

Options

RocksDB creates an Options (RocksDB) when created.

The Options is used in the following:

Performance Metrics

metrics: RocksDBMetrics

metrics reads the following RocksDB properties:

  • rocksdb.total-sst-files-size
  • rocksdb.estimate-table-readers-mem
  • rocksdb.size-all-mem-tables
  • rocksdb.block-cache-usage

metrics computes writeBatchMemUsage by requesting the RocksDB WriteBatchWithIndex for WriteBatch to getDataSize.

metrics computes nativeOpsHistograms.

metrics computes nativeOpsMetrics.

In the end, metrics creates a RocksDBMetrics with the following:


metrics is used when:

  • RocksDB is requested to commit
  • RocksDBStateStore is requested for metrics

Statistics

nativeStats: Statistics

RocksDB requests Options for a Statistics (RocksDB) to initialize nativeStats when created.

nativeStats is used when:

Retrieving Value for Key

get(
  key: Array[Byte]): Array[Byte]

get requests the WriteBatchWithIndex to getFromBatchAndDB the given key from the NativeRocksDB (with the ReadOptions).


get is used when:

Committing State Changes

commit(): Long

commit builds a new version by incrementing the loadedVersion.

commit creates a new checkpoint--prefixed directory under the localRootDir.

commit prints out the following INFO message to the logs and records the duration of requesting the db to write out the updates (writeTimeMs).

Writing updates for [newVersion]

commit prints out the following INFO message to the logs and records the duration of requesting the db to flush the changes (flushTimeMs).

Flushing updates for [newVersion]

With spark.sql.streaming.stateStore.rocksdb.compactOnCommit enabled, commit prints out the following INFO message to the logs and records the duration of requesting the db to compactRange (compactTimeMs). Otherwise, the compact time is 0.

Compacting

commit prints out the following INFO message to the logs and records the duration of requesting the db to pauseBackgroundWork (pauseTimeMs).

Pausing background work

commit prints out the following INFO message to the logs and records the duration of requesting the db to createCheckpoint in the checkpoint directory (checkpointTimeMs).

Creating checkpoint for [newVersion] in [checkpointDir]

commit prints out the following INFO message to the logs and records the duration of requesting the RocksDBFileManager to saveCheckpointToDfs with the checkpoint directory and the numKeysOnWritingVersion (fileSyncTimeMs).

Syncing checkpoint for [newVersion] to DFS

commit updates the internal registries.

Internal Registry New Value
numKeysOnLoadedVersion numKeysOnWritingVersion
loadedVersion newVersion
fileManagerMetrics latestSaveCheckpointMetrics from the RocksDBFileManager

In the end, commit updates the commitLatencyMs metrics and prints out the following INFO message to the logs:

Committed [newVersion], stats = [metrics]

commit is used when:

Logging

Enable ALL logging level for org.apache.spark.sql.execution.streaming.state.RocksDB logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.execution.streaming.state.RocksDB=ALL

Refer to Logging.