ShufflePartitionsUtil¶
coalescePartitions¶
coalescePartitions(
mapOutputStatistics: Seq[Option[MapOutputStatistics]],
inputPartitionSpecs: Seq[Option[Seq[ShufflePartitionSpec]]],
advisoryTargetSize: Long,
minNumPartitions: Int,
minPartitionSize: Long): Seq[Seq[ShufflePartitionSpec]]
coalescePartitions
does nothing and returns an empty result with empty mapOutputStatistics
.
coalescePartitions
calculates the total shuffle bytes (totalPostShuffleInputSize
) by suming up thebytesByPartitionId
(of a shuffleId
) for everyMapOutputStatistics
(in mapOutputStatistics
).
coalescePartitions
calculates the maximum target size (maxTargetSize
) to be a ratio of the total shuffle bytes and the given minNumPartitions
.
coalescePartitions
determines the target size (targetSize
) to be not larger than the given minPartitionSize
.
coalescePartitions
prints out the following INFO message to the logs:
For shuffle([shuffleIds]), advisory target size: [advisoryTargetSize],
actual target size [targetSize], minimum partition size: [minPartitionSize]
coalescePartitions
coalescePartitionsWithoutSkew with all the given inputPartitionSpecs
empty or coalescePartitionsWithSkew.
inputPartitionSpecs
inputPartitionSpecs
can be empty for ShuffleQueryStageExecs and available for AQEShuffleReadExecs.
coalescePartitions
is used when:
- CoalesceShufflePartitions adaptive physical optimization is executed
Logging¶
Enable ALL
logging level for org.apache.spark.sql.execution.adaptive.ShufflePartitionsUtil
logger to see what happens inside.
Add the following line to conf/log4j2.properties
:
log4j.logger.org.apache.spark.sql.execution.adaptive.ShufflePartitionsUtil=ALL
Refer to Logging.