Deduplicate Unary Logical Operator¶
Deduplicate
is a unary logical operator that represents dropDuplicates operator.
Deduplicate
has <
val uniqueRates = spark.
readStream.
format("rate").
load.
dropDuplicates("value") // <-- creates Deduplicate logical operator
// Note the streaming flag
scala> println(uniqueRates.queryExecution.logical.numberedTreeString)
00 Deduplicate [value#33L], true // <-- streaming flag enabled
01 +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@4785f176,rate,List(),None,List(),None,Map(),None), rate, [timestamp#32, value#33L]
CAUTION: FIXME Example with duplicates across batches to show that Deduplicate
keeps state and withWatermark operator should also be used to limit how much is stored (to not cause OOM)
Note
UnsupportedOperationChecker
ensures that dropDuplicates operator is not used after aggregation on streaming Datasets.
The following code is not supported in Structured Streaming and results in an AnalysisException
.
val counts = spark.
readStream.
format("rate").
load.
groupBy(window($"timestamp", "5 seconds") as "group").
agg(count("value") as "value_count").
dropDuplicates // <-- after groupBy
import scala.concurrent.duration._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
val sq = counts.
writeStream.
format("console").
trigger(Trigger.ProcessingTime(10.seconds)).
outputMode(OutputMode.Complete).
start
org.apache.spark.sql.AnalysisException: dropDuplicates is not supported after aggregation on a streaming DataFrame/Dataset;;
Note
Deduplicate
logical operator is translated (planned) to:
-
StreamingDeduplicateExec physical operator in StreamingDeduplicationStrategy execution planning strategy for streaming Datasets (aka streaming plans)
-
Aggregate
physical operator inReplaceDeduplicateWithAggregate
execution planning strategy for non-streaming/batch Datasets (batch plans)
[[output]] The output schema of Deduplicate
is exactly the <
Creating Instance¶
Deduplicate
takes the following when created:
- [[keys]] Attributes for keys
- [[child]] Child logical operator (i.e.
LogicalPlan
) - [[streaming]] Flag whether the logical operator is for streaming (enabled) or batch (disabled) mode