Skip to content

Deduplicate Unary Logical Operator

Deduplicate is a unary logical operator that represents dropDuplicates operator.

Deduplicate has <> flag enabled for streaming Datasets.

val uniqueRates = spark.
  readStream.
  format("rate").
  load.
  dropDuplicates("value")  // <-- creates Deduplicate logical operator
// Note the streaming flag
scala> println(uniqueRates.queryExecution.logical.numberedTreeString)
00 Deduplicate [value#33L], true  // <-- streaming flag enabled
01 +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@4785f176,rate,List(),None,List(),None,Map(),None), rate, [timestamp#32, value#33L]

CAUTION: FIXME Example with duplicates across batches to show that Deduplicate keeps state and withWatermark operator should also be used to limit how much is stored (to not cause OOM)

Note

UnsupportedOperationChecker ensures that dropDuplicates operator is not used after aggregation on streaming Datasets.

The following code is not supported in Structured Streaming and results in an AnalysisException.

val counts = spark.
  readStream.
  format("rate").
  load.
  groupBy(window($"timestamp", "5 seconds") as "group").
  agg(count("value") as "value_count").
  dropDuplicates  // <-- after groupBy

import scala.concurrent.duration._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
val sq = counts.
  writeStream.
  format("console").
  trigger(Trigger.ProcessingTime(10.seconds)).
  outputMode(OutputMode.Complete).
  start
org.apache.spark.sql.AnalysisException: dropDuplicates is not supported after aggregation on a streaming DataFrame/Dataset;;

Note

Deduplicate logical operator is translated (planned) to:

  • StreamingDeduplicateExec physical operator in StreamingDeduplicationStrategy execution planning strategy for streaming Datasets (aka streaming plans)

  • Aggregate physical operator in ReplaceDeduplicateWithAggregate execution planning strategy for non-streaming/batch Datasets (batch plans)

[[output]] The output schema of Deduplicate is exactly the <>'s output schema.

Creating Instance

Deduplicate takes the following when created:

  • [[keys]] Attributes for keys
  • [[child]] Child logical operator (i.e. LogicalPlan)
  • [[streaming]] Flag whether the logical operator is for streaming (enabled) or batch (disabled) mode