Skip to content

UnsupportedOperationChecker

UnsupportedOperationChecker checks whether the logical plan of a streaming query uses supported operations only.

UnsupportedOperationChecker is used only when spark.sql.streaming.unsupportedOperationCheck configuration property is enabled.

Streaming Query Verification

checkForStreaming(
  plan: LogicalPlan,
  outputMode: OutputMode): Unit

checkForStreaming asserts that the following requirements hold:

  1. Only one streaming aggregation is allowed

  2. Streaming aggregation with Append output mode requires watermark (on the grouping expressions)

  3. Multiple flatMapGroupsWithState operators are only allowed with Append output mode


checkForStreaming is used when:

Only One Streaming Aggregation Is Allowed

checkForStreaming finds all streaming aggregates (i.e., Aggregate logical operators with streaming sources).

checkForStreaming asserts that there is exactly one streaming aggregation in a streaming query.

Otherwise, checkForStreaming reports a AnalysisException:

Multiple streaming aggregations are not supported with streaming DataFrames/Datasets

Streaming Aggregation With Append Output Mode Requires Watermark

checkForStreaming asserts that watermark was defined for a streaming aggregation with Append output mode (on at least one of the grouping expressions).

Otherwise, checkForStreaming reports a AnalysisException:

Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark

CAUTION: FIXME

Multiple flatMapGroupsWithState Operators Are Only Allowed With Append Output Mode

checkForStreaming counts all FlatMapGroupsWithState logical operators (on streaming Datasets with isMapGroupsWithState flag disabled).

checkForStreaming asserts that multiple FlatMapGroupsWithState logical operators are only used when:

  • outputMode is Append output mode
  • outputMode of the FlatMapGroupsWithState logical operators is also Append output mode

Otherwise, checkForStreaming reports a AnalysisException:

Multiple flatMapGroupsWithStates are not supported when they are not all in append mode or the output mode is not append on a streaming DataFrames/Datasets

checkForStreamStreamJoinWatermark

checkForStreamStreamJoinWatermark(
  join: Join): Unit

checkForStreamStreamJoinWatermark...FIXME

checkStreamingQueryGlobalWatermarkLimit

checkStreamingQueryGlobalWatermarkLimit(
  plan: LogicalPlan,
  outputMode: OutputMode): Unit

checkStreamingQueryGlobalWatermarkLimit finds stateful operators in the given logical query plan with another stateful operation that can possibly emit late rows and throws an AnalysisException.

checkStreamingQueryGlobalWatermarkLimit propagates it (up the call chain) with spark.sql.streaming.statefulOperator.checkCorrectness.enabled enabled or prints out the following WARN message:

Detected pattern of possible 'correctness' issue due to global watermark.
The query contains stateful operation which can emit rows older than the current watermark plus allowed late record delay,
which are "late rows" in downstream stateful operations and these rows can be discarded.
Please refer the programming guide doc for more details.
If you understand the possible risk of correctness issue and still need to run the query, you can disable this check by setting the config `spark.sql.streaming.statefulOperator.checkCorrectness.enabled` to false.

isStatefulOperation

isStatefulOperation(
  p: LogicalPlan): Boolean

isStatefulOperation is positive (true) for the following logical operators:

Otherwise, isStatefulOperation is negative (false).

isStatefulOperationPossiblyEmitLateRows

isStatefulOperationPossiblyEmitLateRows(
  p: LogicalPlan): Boolean

isStatefulOperationPossiblyEmitLateRows is positive (true) for the following logical operators:

Otherwise, isStatefulOperationPossiblyEmitLateRows is negative (false).

UnsupportedOperationsSuite

Review UnsupportedOperationsSuite in the source code of Apache Spark.

$ SBT_MAVEN_PROFILES="-Pyarn,kubernetes,hive,hive-thriftserver,scala-2.13,hadoop-cloud" \
  sbt "catalyst/testOnly org.apache.spark.sql.catalyst.analysis.UnsupportedOperationsSuite"
...
[info] Run completed in 4 seconds, 27 milliseconds.
[info] Total number of tests run: 183
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 183, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.

Learn more in the official documentation of Apache Spark.