Skip to content

Except Logical Operator

Except is a SetOperation binary logical operator that represents the following high-level operators (in a logical plan):

Creating Instance

Except takes the following to be created:

Except is created when:

Logical Optimization

Except is supposed to be resolved (optimized) to other logical commands at logical optimization phase (i.e. Except should not be part of a logical plan after logical optimization).

BasicOperators execution planning strategy throws an IllegalStateException if conversions did not happen.

Target Logical Operators Optimization Rules and Demos
Left-Anti Join Except (DISTINCT) in ReplaceExceptWithAntiJoin logical optimization rule

Demo: Except Operator Replaced with Left-Anti Join

Filter Except (DISTINCT) in ReplaceExceptWithFilter logical optimization rule

Demo: Except Operator Replaced with Filter Operator

Union, Aggregate and Generate Except (ALL) in RewriteExceptAll logical optimization rule

Demo: Except (All) Operator Replaced with Union, Aggregate and Generate Operators

Catalyst DSL

except(
  otherPlan: LogicalPlan,
  isAll: Boolean): LogicalPlan

Catalyst DSL defines except extension method to create an Except logical operator (e.g. for testing or Spark SQL internals exploration).

import org.apache.spark.sql.catalyst.dsl.plans._
val plan = table("a").except(table("b"), isAll = false)
scala> println(plan.numberedTreeString)
00 'Except false
01 :- 'UnresolvedRelation `a`
02 +- 'UnresolvedRelation `b`

import org.apache.spark.sql.catalyst.plans.logical.Except
val op = plan.p(0)
assert(op.isInstanceOf[Except])

Except Only on Relations with Same Number of Columns

Except logical operator can only be performed on tables with the same number of columns.

scala> left.except(right)
org.apache.spark.sql.AnalysisException: Except can only be performed on tables with the same number of columns, but the first table has 3 columns and the second table has 4 columns;;
'Except false
:- SubqueryAlias `default`.`except_left`
:  +- Relation[id#16,name#17,triple#18] parquet
+- SubqueryAlias `default`.`except_right`
   +- Relation[id#181,name#182,triple#183,extra_column#184] parquet

  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.failAnalysis(CheckAnalysis.scala:43)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.failAnalysis$(CheckAnalysis.scala:42)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:95)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$16(CheckAnalysis.scala:288)
  ...

Demo

Except Operator Replaced with Left-Anti Join

Seq((0, "zero", "000"), (1, "one", "111"))
  .toDF("id", "name", "triple")
  .write
  .saveAsTable("except_left")
val left = spark.table("except_left")

// The number of rows differ
Seq((1, "one", "111"))
  .toDF("id", "name", "triple")
  .write
  .mode("overwrite")
  .saveAsTable("except_right")
val right = spark.table("except_right")

val q = left.except(right)

// SELECT a1, a2 FROM Tab1 EXCEPT SELECT b1, b2 FROM Tab2
// ==>  SELECT DISTINCT a1, a2 FROM Tab1 LEFT ANTI JOIN Tab2 ON a1<=>b1 AND a2<=>b2

// Note the use of <=> null-safe comparison operator
scala> println(q.queryExecution.optimizedPlan.numberedTreeString)
00 Aggregate [id#16, name#17, triple#18], [id#16, name#17, triple#18]
01 +- Join LeftAnti, (((id#16 <=> id#209) && (name#17 <=> name#210)) && (triple#18 <=> triple#211))
02    :- Relation[id#16,name#17,triple#18] parquet
03    +- Relation[id#209,name#210,triple#211] parquet

Except Operator Replaced with Filter Operator

Seq((0, "zero", "000"), (1, "one", "111"))
  .toDF("id", "name", "triple")
  .write
  .saveAsTable("except_left")
val t1 = spark.table("except_left")

val left = t1.where(length($"name") > 3)
val right = t1.where($"id" > 0)

// SELECT a1, a2 FROM Tab1 WHERE a2 = 12 EXCEPT SELECT a1, a2 FROM Tab1 WHERE a1 = 5
val q = left.except(right)

scala> println(q.queryExecution.optimizedPlan.numberedTreeString)
00 Aggregate [id#16, name#17, triple#18], [id#16, name#17, triple#18]
01 +- Filter ((length(name#17) = 3) && NOT coalesce((id#16 = 0), false))
02    +- Relation[id#16,name#17,triple#18] parquet

Except (All) Operator Replaced with Union, Aggregate and Generate Operators

Seq((0, "zero", "000"), (1, "one", "111"))
  .toDF("id", "name", "triple")
  .write
  .saveAsTable("except_left")
val left = spark.table("except_left")

// The number of rows differ
Seq((1, "one", "111"))
  .toDF("id", "name", "triple")
  .write
  .mode("overwrite")
  .saveAsTable("except_right")
val right = spark.table("except_right")

// SELECT c1 FROM ut1 EXCEPT ALL SELECT c1 FROM ut2
val q = left.exceptAll(right)

scala> println(q.queryExecution.optimizedPlan.numberedTreeString)
00 Project [id#16, name#17, triple#18]
01 +- Generate replicaterows(sum#227L, id#16, name#17, triple#18), [3], false, [id#16, name#17, triple#18]
02    +- Filter (isnotnull(sum#227L) && (sum#227L > 0))
03       +- Aggregate [id#16, name#17, triple#18], [id#16, name#17, triple#18, sum(vcol#224L) AS sum#227L]
04          +- Union
05             :- Project [1 AS vcol#224L, id#16, name#17, triple#18]
06             :  +- Relation[id#16,name#17,triple#18] parquet
07             +- Project [-1 AS vcol#225L, id#209, name#210, triple#211]
08                +- Relation[id#209,name#210,triple#211] parquet