Configuration Properties¶
Configuration properties (aka settings) allow you to fine-tune a Spark Structured Streaming application.
The Internals of Spark SQL
Learn more about Configuration Properties in The Internals of Spark SQL.
aggregation.stateFormatVersion¶
spark.sql.streaming.aggregation.stateFormatVersion
(internal) Version of the state format (and a StreamingAggregationStateManagerBaseImpl)
Default: 2
Supported values:
1
(for the legacyStreamingAggregationStateManagerImplV1
)2
(for the default StreamingAggregationStateManagerImplV2)
Used when:
- StatefulAggregationStrategy execution planning strategy is executed (and plans a streaming query with a non-windowed aggregate)
checkpointFileManagerClass¶
spark.sql.streaming.checkpointFileManagerClass
(internal) CheckpointFileManager to use to write checkpoint files atomically
Default: (undefined)
Unless defined, FileContextBasedCheckpointFileManager is considered first, followed by FileSystemBasedCheckpointFileManager in case of unsupported file system used for storing metadata files
Used when:
CheckpointFileManager
is requested to create a CheckpointFileManager
checkpointLocation¶
spark.sql.streaming.checkpointLocation
Default checkpoint directory for storing checkpoint data
Default: (empty)
commitProtocolClass¶
spark.sql.streaming.commitProtocolClass
(internal) FileCommitProtocol
to use for writing out micro-batches in FileStreamSink.
Default: org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol
Use SQLConf.streamingFileCommitProtocolClass to access the current value.
The Internals of Apache Spark
Learn more on FileCommitProtocol in The Internals of Apache Spark.
continuous.executorQueueSize¶
spark.sql.streaming.continuous.executorQueueSize
(internal) The size (measured in number of rows) of the queue used in continuous execution to buffer the results of a ContinuousDataReader.
Default: 1024
continuous.executorPollIntervalMs¶
spark.sql.streaming.continuous.executorPollIntervalMs
(internal) The interval (in millis) at which continuous execution readers will poll to check whether the epoch has advanced on the driver.
Default: 100
(ms)
disabledV2MicroBatchReaders¶
spark.sql.streaming.disabledV2MicroBatchReaders
(internal) A comma-separated list of fully-qualified class names of data source providers for which MicroBatchStream is disabled. Reads from these sources will fall back to the V1 Sources.
Default: (empty)
Use SQLConf.disabledV2StreamingMicroBatchReaders to get the current value.
fileSink.log.cleanupDelay¶
spark.sql.streaming.fileSink.log.cleanupDelay
(internal) How long (in millis) that a file is guaranteed to be visible for all readers.
Default: 10 minutes
Use SQLConf.fileSinkLogCleanupDelay to access the current value.
fileSink.log.deletion¶
spark.sql.streaming.fileSink.log.deletion
(internal) Whether to delete the expired log files in file stream sink
Default: true
Use SQLConf.fileSinkLogDeletion to access the current value.
fileSink.log.compactInterval¶
spark.sql.streaming.fileSink.log.compactInterval
(internal) Number of log files after which all the previous files are compacted into the next log file
Default: 10
Use SQLConf.fileSinkLogCompactInterval to access the current value.
fileSource.log.cleanupDelay¶
spark.sql.streaming.fileSource.log.cleanupDelay
(internal) How long (in millis) a file is guaranteed to be visible for all readers.
Default: 10
(minutes)
Use SQLConf.fileSourceLogCleanupDelay to get the current value.
fileSource.log.compactInterval¶
spark.sql.streaming.fileSource.log.compactInterval
(internal) Number of log files after which all the previous files are compacted into the next log file.
Default: 10
Must be a positive value (greater than 0
)
Use SQLConf.fileSourceLogCompactInterval to get the current value.
fileSource.log.deletion¶
spark.sql.streaming.fileSource.log.deletion
(internal) Whether to delete the expired log files in file stream source
Default: true
Use SQLConf.fileSourceLogDeletion to get the current value.
flatMapGroupsWithState.stateFormatVersion¶
spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion
(internal) State format version used to create a StateManager for FlatMapGroupsWithStateExec physical operator
Default: 2
Supported values:
1
2
Used when:
- FlatMapGroupsWithStateStrategy execution planning strategy is requested to plan a streaming query (and creates a FlatMapGroupsWithStateExec physical operator for every FlatMapGroupsWithState logical operator)
join.stateFormatVersion¶
spark.sql.streaming.join.stateFormatVersion
(internal) State format version used by streaming join operations in a streaming query. State between versions tend to be incompatible, so state format version shouldn't be modified after running.
Default: 2
Supported values:
1
2
kafka.useDeprecatedOffsetFetching¶
spark.sql.streaming.kafka.useDeprecatedOffsetFetching
(internal) When enabled (true
), the deprecated Kafka Consumer
-based offset fetching is used (using KafkaOffsetReaderConsumer) which could cause infinite wait in Spark queries (leaving query restart as the only workaround). Otherwise, KafkaOffsetReaderAdmin is used.
Default: true
Use SQLConf.useDeprecatedKafkaOffsetFetching for the current value
Used when:
KafkaOffsetReader
utility is used to create a KafkaOffsetReader
maxBatchesToRetainInMemory¶
spark.sql.streaming.maxBatchesToRetainInMemory
(internal) The maximum number of batches which will be retained in memory to avoid loading from files.
Default: 2
Maximum count of versions a State Store implementation should retain in memory.
The value adjusts a trade-off between memory usage vs cache miss:
2
covers both success and direct failure cases1
covers only success case0
or negative value disables cache to maximize memory size of executors
Used when HDFSBackedStateStoreProvider
is requested to initialize.
metricsEnabled¶
spark.sql.streaming.metricsEnabled
Enables streaming metrics
Default: false
Use SQLConf.streamingMetricsEnabled to access the current value
Used when:
StreamExecution
is requested to run (and registers a MetricsReporter)
minBatchesToRetain¶
spark.sql.streaming.minBatchesToRetain
(internal) Minimum number of batches that must be retained and made recoverable
Stream execution engines discard (purge) offsets from the offsets
metadata log when the current batch ID (in MicroBatchExecution) or the epoch committed (in ContinuousExecution) is above the threshold.
Default: 100
Use SQLConf.minBatchesToRetain to access the current value.
multipleWatermarkPolicy¶
spark.sql.streaming.multipleWatermarkPolicy
Global watermark policy that is the policy to calculate the global watermark value when there are multiple watermark operators in a streaming query
Default: min
Supported values:
min
- chooses the minimum watermark reported across multiple operatorsmax
- chooses the maximum across multiple operators
Cannot be changed between query restarts from the same checkpoint location.
noDataMicroBatches.enabled¶
spark.sql.streaming.noDataMicroBatches.enabled
Controls whether the streaming micro-batch engine should execute batches with no data to process for eager state management for stateful streaming queries (true
) or not (false
).
Default: true
Use SQLConf.streamingNoDataMicroBatchesEnabled to get the current value
noDataProgressEventInterval¶
spark.sql.streaming.noDataProgressEventInterval
(internal) How long to wait (in millis) between two progress events when there is no data when ProgressReporter
is requested to finish a trigger
Default: 10000L
(10s
)
Use SQLConf.streamingNoDataProgressEventInterval for the current value
numRecentProgressUpdates¶
spark.sql.streaming.numRecentProgressUpdates
Number of StreamingQueryProgresses to retain in progressBuffer internal registry when ProgressReporter
is requested to update progress of streaming query
Default: 100
Use SQLConf.streamingProgressRetention to get the current value
pollingDelay¶
spark.sql.streaming.pollingDelay
(internal) How long (in millis) to delay StreamExecution
before polls for new data when no data was available in a batch
Default: 10
(milliseconds)
statefulOperator.useStrictDistribution¶
spark.sql.streaming.statefulOperator.useStrictDistribution
The purpose of this config is only compatibility; DO NOT MANUALLY CHANGE THIS!!!
When true
, the stateful operator for streaming query will use StatefulOpClusteredDistribution which guarantees stable state partitioning as long as the operator provides consistent grouping keys across the lifetime of query.
When false
, the stateful operator for streaming query will use ClusteredDistribution which is not sufficient to guarantee stable state partitioning despite the operator provides consistent grouping keys across the lifetime of query.
This config will be set to true
for new streaming queries to guarantee stable state partitioning, and set to false for existing streaming queries to not break queries which are restored from existing checkpoints.
Please refer SPARK-38204 for details.
Default: true
Used when:
StatefulOperatorPartitioning
is requested to getCompatibleDistribution
stateStore.compression.codec¶
spark.sql.streaming.stateStore.compression.codec
(internal) The codec used to compress delta and snapshot files generated by StateStore. By default, Spark provides four codecs: lz4, lzf, snappy, and zstd. You can also use fully-qualified class names to specify the codec.
Default: lz4
stateStore.maintenanceInterval¶
spark.sql.streaming.stateStore.maintenanceInterval
The initial delay and how often to execute StateStore's maintenance task.
Default: 60s
stateStore.minDeltasForSnapshot¶
spark.sql.streaming.stateStore.minDeltasForSnapshot
(internal) Minimum number of state store delta files that need to be generated before HDFSBackedStateStore
will consider generating a snapshot (consolidate the deltas into a snapshot)
Default: 10
Use SQLConf.stateStoreMinDeltasForSnapshot to get the current value.
stateStore.providerClass¶
spark.sql.streaming.stateStore.providerClass
(internal) The fully-qualified class name of a StateStoreProvider implementation
Default: HDFSBackedStateStoreProvider
Use SQLConf.stateStoreProviderClass to get the current value
Used when:
StateStoreConf
is requested for providerClassStateStoreWriter
is requested to stateStoreCustomMetricsStreamingQueryStatisticsPage
is requested for the supportedCustomMetrics
stateStore.rocksdb.formatVersion¶
spark.sql.streaming.stateStore.rocksdb.formatVersion
stateStore.rocksdb.trackTotalNumberOfRows¶
spark.sql.streaming.stateStore.rocksdb.trackTotalNumberOfRows
ui.enabled¶
spark.sql.streaming.ui.enabled
Enables Structured Streaming Web UI for a Spark application (with Spark Web UI enabled)
Default: true
Used when:
SharedState
(Spark SQL) is created
ui.enabledCustomMetricList¶
spark.sql.streaming.ui.enabledCustomMetricList
(internal) A comma-separated list of the names of the Supported Custom Metrics of stateful operators to render the timeline and histogram of in Structured Streaming UI (in addition to the regular metrics in Streaming Query Statistics)
Default: (empty)
Supported custom metrics are StateStoreProvider-specific (and can be found and monitored using StateOperatorProgress)
statefulOperatorCustomMetrics
statefulOperatorCustomMetrics should be included, too, but it seems that they might've been overlooked. To be verified.
ui.retainedProgressUpdates¶
spark.sql.streaming.ui.retainedProgressUpdates
Number of progress updates of a streaming query to retain for Structured Streaming UI
Default: 100
Used when:
StreamingQueryStatusListener
is requested to handle a query progress
unsupportedOperationCheck¶
spark.sql.streaming.unsupportedOperationCheck
(internal) When enabled (true
), StreamingQueryManager
makes sure that the logical plan of a streaming query uses supported operations only
Default: true