Skip to content

ShufflePartitionsUtil

coalescePartitions

coalescePartitions(
  mapOutputStatistics: Seq[Option[MapOutputStatistics]],
  inputPartitionSpecs: Seq[Option[Seq[ShufflePartitionSpec]]],
  advisoryTargetSize: Long,
  minNumPartitions: Int,
  minPartitionSize: Long): Seq[Seq[ShufflePartitionSpec]]

coalescePartitions does nothing and returns an empty result with empty mapOutputStatistics.

coalescePartitions calculates the total shuffle bytes (totalPostShuffleInputSize) by suming up thebytesByPartitionId (of a shuffleId) for everyMapOutputStatistics (in mapOutputStatistics).

coalescePartitions calculates the maximum target size (maxTargetSize) to be a ratio of the total shuffle bytes and the given minNumPartitions.

coalescePartitions determines the target size (targetSize) to be not larger than the given minPartitionSize.

coalescePartitions prints out the following INFO message to the logs:

For shuffle([shuffleIds]), advisory target size: [advisoryTargetSize],
actual target size [targetSize], minimum partition size: [minPartitionSize]

coalescePartitions coalescePartitionsWithoutSkew with all the given inputPartitionSpecs empty or coalescePartitionsWithSkew.

inputPartitionSpecs

inputPartitionSpecs can be empty for ShuffleQueryStageExecs and available for AQEShuffleReadExecs.

coalescePartitions is used when:

Logging

Enable ALL logging level for org.apache.spark.sql.execution.adaptive.ShufflePartitionsUtil logger to see what happens inside.

Add the following line to conf/log4j2.properties:

log4j.logger.org.apache.spark.sql.execution.adaptive.ShufflePartitionsUtil=ALL

Refer to Logging.