OffsetSeqMetadata¶
OffsetSeqMetadata is the metadata of a streaming batch.
OffsetSeqMetadata is persisted in the write-ahead offset log.
Creating Instance¶
OffsetSeqMetadata takes the following to be created:
- Batch Watermark
- Batch Timestamp
- Configuration (default: empty)
OffsetSeqMetadata is created using apply.
Batch Watermark¶
OffsetSeqMetadata is given the current batch's event-time watermark when created. Unless given, it is assumed 0.
Batch Timestamp¶
OffsetSeqMetadata is given the current batch's batch timestamp when created. Unless given, it is assumed 0.
Creating OffsetSeqMetadata¶
apply(
batchWatermarkMs: Long,
batchTimestampMs: Long,
sessionConf: RuntimeConfig): OffsetSeqMetadata
apply find the relevantSQLConfs in the given RuntimeConfig and creates a OffsetSeqMetadata (with the batch watermark and timestamp, and the relevant configs found in the session config).
apply is used when:
MicroBatchExecutionis requested to populateStartOffsets (while restarting a streaming query with a checkpointed offsets)StreamExecutionis created and requested to runStream
Checkpointed Properties¶
OffsetSeqMetadata allows the following configuration properties to be once-only settable that can only be set once and can never change after a streaming query is started.
spark.sql.shuffle.partitions(Spark SQL)- spark.sql.streaming.aggregation.stateFormatVersion
- spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion
- spark.sql.streaming.join.stateFormatVersion
- spark.sql.streaming.multipleWatermarkPolicy
- spark.sql.streaming.statefulOperator.useStrictDistribution
- spark.sql.streaming.stateStore.compression.codec
- spark.sql.streaming.stateStore.providerClass
- spark.sql.streaming.stateStore.rocksdb.formatVersion
The configuration properties are searched for while creating an OffsetSeqMetadata.
The values of these configs are persisted into the offset log in the checkpoint position.
Once persisted in a checkpoint location, restarting a streaming query will make the persisted values be in effect again (overriding any current values of the properties if set).
Updating RuntimeConfig with Metadata Properties¶
setSessionConf(
metadata: OffsetSeqMetadata,
sessionConf: RuntimeConfig): Unit
For any relevant SQL property set in the given OffsetSeqMetadata, setSessionConf overrides the value in the given RuntimeConfig if set. setSessionConf prints out the following WARN message to the logs:
Updating the value of conf '[confKey]' in current session from '[sessionValue]' to '[metadataValue]'.
When not set in the given OffsetSeqMetadata, setSessionConf makes the default values effective in the given RuntimeConfig. setSessionConf prints out the following WARN message to the logs:
Conf '[confKey]' was not found in the offset log, using default value '[defaultValue]'
Otherwise, setSessionConf prints out one of the following WARN messages to the logs based on whether a relevant property is set in the given RuntimeConfig or not.
Conf '[confKey]' was not found in the offset log. Using existing session conf value '[v]'.
Conf '[confKey]' was not found in the offset log. No value set in session conf.
setSessionConf is used when:
MicroBatchExecutionis requested to populateStartOffsets (while restarting a streaming query with a checkpointed offsets)