SortExec Unary Physical Operator¶
SortExec
is a unary physical operator (that, among other use cases, represents Sort logical operators at execution).
Creating Instance¶
SortExec
takes the following to be created:
- SortOrder expressions
-
global
flag - Child physical operator
-
testSpillFrequency
(default:0
)
SortExec
is created when:
- BasicOperators execution planning strategy is executed (with a Sort logical operator)
FileFormatWriter
utility is used to write out a query result- EnsureRequirements physical optimization is executed
Performance Metrics¶
Key | Name (in web UI) | Description |
---|---|---|
peakMemory | peak memory | |
sortTime | sort time | |
spillSize | spill size |
Radix Sort¶
SortExec
operator uses the spark.sql.sort.enableRadixSort configuration property when creating an UnsafeExternalRowSorter.
BlockingOperatorWithCodegen¶
SortExec
is a BlockingOperatorWithCodegen
.
CodegenSupport¶
SortExec
supports Java code generation (indirectly as a BlockingOperatorWithCodegen
).
Output Data Ordering Requirements¶
outputOrdering: Seq[SortOrder]
outputOrdering
is the given SortOrder expressions.
outputOrdering
is part of the SparkPlan abstraction.
Required Child Output Distribution¶
requiredChildDistribution: Seq[Distribution]
requiredChildDistribution
is a OrderedDistribution (with the SortOrder expressions) with the global flag enabled or a UnspecifiedDistribution.
requiredChildDistribution
is part of the SparkPlan abstraction.
Physical Optimizations¶
OptimizeSkewedJoin¶
OptimizeSkewedJoin physical optimization is used to optimize skewed SortMergeJoinExecs (with SortExec
operators) in Adaptive Query Execution.
RemoveRedundantSorts¶
SortExec
operators can be removed from a physical query plan by RemoveRedundantSorts physical optimization (with spark.sql.execution.removeRedundantSorts enabled).
Creating UnsafeExternalRowSorter¶
createSorter(): UnsafeExternalRowSorter
createSorter
...FIXME
createSorter
is used when:
SortExec
is requested to executeFileFormatWriter
utility is used to write out a query result
Demo¶
val q = Seq((0, "zero"), (1, "one")).toDF("id", "name").sort('id)
val qe = q.queryExecution
val logicalPlan = qe.analyzed
scala> println(logicalPlan.numberedTreeString)
00 Sort [id#72 ASC NULLS FIRST], true
01 +- Project [_1#69 AS id#72, _2#70 AS name#73]
02 +- LocalRelation [_1#69, _2#70]
// BasicOperators does the conversion of Sort logical operator to SortExec
val sparkPlan = qe.sparkPlan
scala> println(sparkPlan.numberedTreeString)
00 Sort [id#72 ASC NULLS FIRST], true, 0
01 +- LocalTableScan [id#72, name#73]
// SortExec supports Whole-Stage Code Generation
val executedPlan = qe.executedPlan
scala> println(executedPlan.numberedTreeString)
00 *(1) Sort [id#72 ASC NULLS FIRST], true, 0
01 +- Exchange rangepartitioning(id#72 ASC NULLS FIRST, 200)
02 +- LocalTableScan [id#72, name#73]
import org.apache.spark.sql.execution.SortExec
val sortExec = executedPlan.collect { case se: SortExec => se }.head
assert(sortExec.isInstanceOf[SortExec])