SortExec Physical Operator¶
SortExec is a unary physical operator that (most importantly) represents Sort logical operator at execution.
Creating Instance¶
SortExec takes the following to be created:
- SortOrder expressions
-
globalflag - Child physical operator
-
testSpillFrequency
SortExec is created when:
- BasicOperators execution planning strategy is executed (with a Sort logical operator)
FileFormatWriterutility is used to write out a query result- EnsureRequirements physical optimization is executed
Performance Metrics¶
peak memory¶
sort time¶
spill size¶
Number of in-memory bytes spilled by this operator at execution (while an UnsafeExternalRowSorter was sorting the rows in a partition)
The spill size metric is computed using TaskMetrics (Spark Core) and is a difference of the metric before and after sorting.
Radix Sort¶
SortExec operator uses the spark.sql.sort.enableRadixSort configuration property when creating an UnsafeExternalRowSorter.
BlockingOperatorWithCodegen¶
SortExec is a BlockingOperatorWithCodegen.
CodegenSupport¶
SortExec supports Java code generation (indirectly as a BlockingOperatorWithCodegen).
Output Data Ordering Requirements¶
outputOrdering: Seq[SortOrder]
outputOrdering is the given SortOrder expressions.
outputOrdering is part of the SparkPlan abstraction.
Required Child Output Distribution¶
requiredChildDistribution: Seq[Distribution]
requiredChildDistribution is a OrderedDistribution (with the SortOrder expressions) with the global flag enabled or a UnspecifiedDistribution.
requiredChildDistribution is part of the SparkPlan abstraction.
Physical Optimizations¶
OptimizeSkewedJoin¶
OptimizeSkewedJoin physical optimization is used to optimize skewed SortMergeJoinExecs (with SortExec operators) in Adaptive Query Execution.
RemoveRedundantSorts¶
SortExec operators can be removed from a physical query plan by RemoveRedundantSorts physical optimization (with spark.sql.execution.removeRedundantSorts enabled).
Creating UnsafeExternalRowSorter¶
createSorter(): UnsafeExternalRowSorter
createSorter creates a BaseOrdering for the sortOrders and the output schema.
createSorter uses spark.sql.sort.enableRadixSort configuration property to enable radix sort when possible.
Radix Sort, Sort Order and Supported Data Types
Radix sort can be used when there is exactly one sortOrder that can be satisfied (based on the data type) with a radix sort on the prefix.
The following data types are supported:
AnsiIntervalTypeBooleanTypeByteTypeDateTypeDecimalType(up to18precision digits)DoubleTypeFloatTypeIntegerTypeLongTypeShortTypeTimestampNTZTypeTimestampType
createSorter creates an UnsafeExternalRowSorter with the following:
spark.buffer.pageSize(default:64MB) for a page size- Whether radix sort can be used
- others
createSorter is used when:
SortExecis executed (one per partition)FileFormatWriterutility is used to write out a query result
Demo¶
val q = Seq((0, "zero"), (1, "one")).toDF("id", "name").sort('id)
val qe = q.queryExecution
val logicalPlan = qe.analyzed
scala> println(logicalPlan.numberedTreeString)
00 Sort [id#72 ASC NULLS FIRST], true
01 +- Project [_1#69 AS id#72, _2#70 AS name#73]
02 +- LocalRelation [_1#69, _2#70]
// BasicOperators does the conversion of Sort logical operator to SortExec
val sparkPlan = qe.sparkPlan
scala> println(sparkPlan.numberedTreeString)
00 Sort [id#72 ASC NULLS FIRST], true, 0
01 +- LocalTableScan [id#72, name#73]
// SortExec supports Whole-Stage Code Generation
val executedPlan = qe.executedPlan
scala> println(executedPlan.numberedTreeString)
00 *(1) Sort [id#72 ASC NULLS FIRST], true, 0
01 +- Exchange rangepartitioning(id#72 ASC NULLS FIRST, 200)
02 +- LocalTableScan [id#72, name#73]
import org.apache.spark.sql.execution.SortExec
val sortExec = executedPlan.collect { case se: SortExec => se }.head
assert(sortExec.isInstanceOf[SortExec])