UnsupportedOperationChecker¶
UnsupportedOperationChecker checks whether the logical plan of a streaming query uses supported operations only.
UnsupportedOperationChecker is used only when spark.sql.streaming.unsupportedOperationCheck configuration property is enabled.
Streaming Query Verification¶
checkForStreaming(
plan: LogicalPlan,
outputMode: OutputMode): Unit
checkForStreaming asserts that the following requirements hold:
-
Streaming aggregation with Append output mode requires watermark (on the grouping expressions)
-
Multiple flatMapGroupsWithState operators are only allowed with Append output mode
checkForStreaming is used when:
StreamingQueryManageris requested to create a StreamingQueryWrapper (for starting a streaming query), but only when the internal spark.sql.streaming.unsupportedOperationCheck configuration property is enabled.
Only One Streaming Aggregation Is Allowed¶
checkForStreaming finds all streaming aggregates (i.e., Aggregate logical operators with streaming sources).
checkForStreaming asserts that there is exactly one streaming aggregation in a streaming query.
Otherwise, checkForStreaming reports a AnalysisException:
Multiple streaming aggregations are not supported with streaming DataFrames/Datasets
Streaming Aggregation With Append Output Mode Requires Watermark¶
checkForStreaming asserts that watermark was defined for a streaming aggregation with Append output mode (on at least one of the grouping expressions).
Otherwise, checkForStreaming reports a AnalysisException:
Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark
CAUTION: FIXME
Multiple flatMapGroupsWithState Operators Are Only Allowed With Append Output Mode¶
checkForStreaming counts all FlatMapGroupsWithState logical operators (on streaming Datasets with isMapGroupsWithState flag disabled).
checkForStreaming asserts that multiple FlatMapGroupsWithState logical operators are only used when:
outputModeis Append output mode- outputMode of the
FlatMapGroupsWithStatelogical operators is also Append output mode
Otherwise, checkForStreaming reports a AnalysisException:
Multiple flatMapGroupsWithStates are not supported when they are not all in append mode or the output mode is not append on a streaming DataFrames/Datasets
checkForStreamStreamJoinWatermark¶
checkForStreamStreamJoinWatermark(
join: Join): Unit
checkForStreamStreamJoinWatermark...FIXME
checkStreamingQueryGlobalWatermarkLimit¶
checkStreamingQueryGlobalWatermarkLimit(
plan: LogicalPlan,
outputMode: OutputMode): Unit
checkStreamingQueryGlobalWatermarkLimit finds stateful operators in the given logical query plan with another stateful operation that can possibly emit late rows and throws an AnalysisException.
checkStreamingQueryGlobalWatermarkLimit propagates it (up the call chain) with spark.sql.streaming.statefulOperator.checkCorrectness.enabled enabled or prints out the following WARN message:
Detected pattern of possible 'correctness' issue due to global watermark.
The query contains stateful operation which can emit rows older than the current watermark plus allowed late record delay,
which are "late rows" in downstream stateful operations and these rows can be discarded.
Please refer the programming guide doc for more details.
If you understand the possible risk of correctness issue and still need to run the query, you can disable this check by setting the config `spark.sql.streaming.statefulOperator.checkCorrectness.enabled` to false.
isStatefulOperation¶
isStatefulOperation(
p: LogicalPlan): Boolean
isStatefulOperation is positive (true) for the following logical operators:
Aggregate(Spark SQL) over a streaming data sourceJoin(Spark SQL) with left and right streaming data sources- FlatMapGroupsWithState over a streaming data source
- Deduplicate over a streaming data source
Otherwise, isStatefulOperation is negative (false).
isStatefulOperationPossiblyEmitLateRows¶
isStatefulOperationPossiblyEmitLateRows(
p: LogicalPlan): Boolean
isStatefulOperationPossiblyEmitLateRows is positive (true) for the following logical operators:
Aggregate(Spark SQL) over a streaming data source with Append output modeJoin(Spark SQL) with left and right streaming data sources for all butInnerjoin types- FlatMapGroupsWithState over a streaming data source with the outputMode as Append
Otherwise, isStatefulOperationPossiblyEmitLateRows is negative (false).
UnsupportedOperationsSuite¶
Review UnsupportedOperationsSuite in the source code of Apache Spark.
$ SBT_MAVEN_PROFILES="-Pyarn,kubernetes,hive,hive-thriftserver,scala-2.13,hadoop-cloud" \
sbt "catalyst/testOnly org.apache.spark.sql.catalyst.analysis.UnsupportedOperationsSuite"
...
[info] Run completed in 4 seconds, 27 milliseconds.
[info] Total number of tests run: 183
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 183, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
Learn more in the official documentation of Apache Spark.