ShufflePartitionsUtil¶
coalescePartitions¶
coalescePartitions(
mapOutputStatistics: Seq[Option[MapOutputStatistics]],
inputPartitionSpecs: Seq[Option[Seq[ShufflePartitionSpec]]],
advisoryTargetSize: Long,
minNumPartitions: Int,
minPartitionSize: Long): Seq[Seq[ShufflePartitionSpec]]
coalescePartitions does nothing and returns an empty result with empty mapOutputStatistics.
coalescePartitions calculates the total shuffle bytes (totalPostShuffleInputSize) by suming up thebytesByPartitionId (of a shuffleId) for everyMapOutputStatistics (in mapOutputStatistics).
coalescePartitions calculates the maximum target size (maxTargetSize) to be a ratio of the total shuffle bytes and the given minNumPartitions.
coalescePartitions determines the target size (targetSize) to be not larger than the given minPartitionSize.
coalescePartitions prints out the following INFO message to the logs:
For shuffle([shuffleIds]), advisory target size: [advisoryTargetSize],
actual target size [targetSize], minimum partition size: [minPartitionSize]
coalescePartitions coalescePartitionsWithoutSkew with all the given inputPartitionSpecs empty or coalescePartitionsWithSkew.
inputPartitionSpecs
inputPartitionSpecs can be empty for ShuffleQueryStageExecs and available for AQEShuffleReadExecs.
coalescePartitions is used when:
- CoalesceShufflePartitions adaptive physical optimization is executed
Logging¶
Enable ALL logging level for org.apache.spark.sql.execution.adaptive.ShufflePartitionsUtil logger to see what happens inside.
Add the following line to conf/log4j2.properties:
log4j.logger.org.apache.spark.sql.execution.adaptive.ShufflePartitionsUtil=ALL
Refer to Logging.