OffsetSeqMetadata¶
OffsetSeqMetadata
is the metadata of a streaming batch.
OffsetSeqMetadata
is persisted in the write-ahead offset log.
Creating Instance¶
OffsetSeqMetadata
takes the following to be created:
- Batch Watermark
- Batch Timestamp
- Configuration (default: empty)
OffsetSeqMetadata
is created using apply.
Batch Watermark¶
OffsetSeqMetadata
is given the current batch's event-time watermark when created. Unless given, it is assumed 0
.
Batch Timestamp¶
OffsetSeqMetadata
is given the current batch's batch timestamp when created. Unless given, it is assumed 0
.
Creating OffsetSeqMetadata¶
apply(
batchWatermarkMs: Long,
batchTimestampMs: Long,
sessionConf: RuntimeConfig): OffsetSeqMetadata
apply
find the relevantSQLConfs in the given RuntimeConfig
and creates a OffsetSeqMetadata (with the batch watermark and timestamp, and the relevant configs found in the session config).
apply
is used when:
MicroBatchExecution
is requested to populateStartOffsets (while restarting a streaming query with a checkpointed offsets)StreamExecution
is created and requested to runStream
Checkpointed Properties¶
OffsetSeqMetadata
allows the following configuration properties to be once-only settable that can only be set once and can never change after a streaming query is started.
spark.sql.shuffle.partitions
(Spark SQL)- spark.sql.streaming.aggregation.stateFormatVersion
- spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion
- spark.sql.streaming.join.stateFormatVersion
- spark.sql.streaming.multipleWatermarkPolicy
- spark.sql.streaming.statefulOperator.useStrictDistribution
- spark.sql.streaming.stateStore.compression.codec
- spark.sql.streaming.stateStore.providerClass
- spark.sql.streaming.stateStore.rocksdb.formatVersion
The configuration properties are searched for while creating an OffsetSeqMetadata.
The values of these configs are persisted into the offset log in the checkpoint position.
Once persisted in a checkpoint location, restarting a streaming query will make the persisted values be in effect again (overriding any current values of the properties if set).
Updating RuntimeConfig with Metadata Properties¶
setSessionConf(
metadata: OffsetSeqMetadata,
sessionConf: RuntimeConfig): Unit
For any relevant SQL property set in the given OffsetSeqMetadata, setSessionConf
overrides the value in the given RuntimeConfig
if set. setSessionConf
prints out the following WARN message to the logs:
Updating the value of conf '[confKey]' in current session from '[sessionValue]' to '[metadataValue]'.
When not set in the given OffsetSeqMetadata, setSessionConf
makes the default values effective in the given RuntimeConfig
. setSessionConf
prints out the following WARN message to the logs:
Conf '[confKey]' was not found in the offset log, using default value '[defaultValue]'
Otherwise, setSessionConf
prints out one of the following WARN messages to the logs based on whether a relevant property is set in the given RuntimeConfig
or not.
Conf '[confKey]' was not found in the offset log. Using existing session conf value '[v]'.
Conf '[confKey]' was not found in the offset log. No value set in session conf.
setSessionConf
is used when:
MicroBatchExecution
is requested to populateStartOffsets (while restarting a streaming query with a checkpointed offsets)