Skip to content

RocksDBStateStoreProvider

RocksDBStateStoreProvider is a StateStoreProvider.

RocksDB

rocksDB: RocksDB
Lazy Value

rocksDB is a Scala lazy value to guarantee that the code to initialize it is executed once only (when accessed for the first time) and the computed value never changes afterwards.

Learn more in the Scala Language Specification.

rocksDB requests the StateStoreId for storeCheckpointLocation.

rocksDB builds a store identifier (using the StateStoreId):

StateStoreId(opId=[operatorId],partId=[partitionId],name=[storeName])

rocksDB creates a local root directory (in a temp directory for a directory with the store identifier).

spark.local.dir

Use spark.local.dir Spark property to set up the local root directory.

In the end, rocksDB creates a RocksDB for the store identifier.


rocksDB lazy value is initialized in init.


rocksDB is used when:

Initialization

init(
  stateStoreId: StateStoreId,
  keySchema: StructType,
  valueSchema: StructType,
  numColsPrefixKey: Int,
  storeConf: StateStoreConf,
  hadoopConf: Configuration): Unit

init is part of the StateStoreProvider abstraction.


init sets the internal registries (based on the given arguments):

In the end, init initializes the RocksDB (lazy value).

Looking Up StateStore by Version

getStore(
  version: Long): StateStore

getStore is part of the StateStoreProvider abstraction.


getStore requests the RocksDB to load data for the given version.

In the end, getStore creates a RocksDBStateStore for the given version.

Supported Custom Metrics

supportedCustomMetrics: Seq[StateStoreCustomMetric]

supportedCustomMetrics is part of the StateStoreProvider abstraction.

Note

The following is a subset of the supported custom metrics.

rocksdbBytesCopied

RocksDB: file manager - bytes copied

rocksdbCommitCheckpointLatency

RocksDB: commit - checkpoint time

rocksdbCommitCompactLatency

RocksDB: commit - compact time

rocksdbCommitFileSyncLatencyMs

RocksDB: commit - file sync to external storage time

rocksdbCommitFlushLatency

RocksDB: commit - flush time

rocksdbCommitPauseLatency

RocksDB: commit - pause bg time

rocksdbCommitWriteBatchLatency

RocksDB: commit - write batch time

rocksdbFilesCopied

RocksDB: file manager - files copied

rocksdbGetCount

RocksDB: number of get calls

rocksdbGetLatency

RocksDB: total get call latency

rocksdbPutCount

RocksDB: number of put calls

rocksdbPutLatency

RocksDB: total put call latency

rocksdbReadBlockCacheHitCount

RocksDB: read - count of cache hits in RocksDB block cache avoiding disk read

rocksdbReadBlockCacheMissCount

RocksDB: read - count of cache misses that required reading from local disk

rocksdbTotalBytesRead

RocksDB: read - total of uncompressed bytes read (from memtables/cache/sst) from DB::Get()

rocksdbTotalBytesReadByCompaction

RocksDB: compaction - total bytes read by the compaction process

rocksdbTotalBytesWritten

RocksDB: write - total of uncompressed bytes written by DB::{Put(), Delete(), Merge(), Write()}

rocksdbTotalBytesWrittenByCompaction

RocksDB: compaction - total bytes written by the compaction process

rocksdbTotalCompactionLatencyMs

RocksDB: compaction - total compaction time including background