HashPartitioning¶
HashPartitioning is a Catalyst Partitioning in which rows are distributed across partitions using the MurMur3 hash (of the partitioning expressions) modulo the number of partitions.
HashPartitioning is an Expression that computes the partition Id for data distribution to be consistent across shuffling and bucketing (for joins of bucketed and regular tables).
Creating Instance¶
HashPartitioning takes the following to be created:
- Partitioning Expressions
- Number of partitions
HashPartitioning is created when:
RepartitionByExpressionis requested for the partitioningRebalancePartitionsis requested for the partitioningClusteredDistributionis requested to create a PartitioningHashClusteredDistributionis requested to create a PartitioningFileSourceScanExecphysical operator is requested for the output partitioningBucketingUtilsutility is used togetBucketIdFromValueFileFormatWriterutility is used to write out a query result (with a bucketing spec)BroadcastHashJoinExecphysical operator is requested to expandOutputPartitioning
Unevaluable¶
HashPartitioning is an Unevaluable expression.
Satisfying Distribution¶
satisfies0(
required: Distribution): Boolean
satisfies0 is positive (true) when either the base satisfies0 holds or one of the given Distribution satisfies the following:
-
For a HashClusteredDistribution, the number of the given partitioning expressions and the HashClusteredDistribution's are the same and semantically equal pair-wise
-
For a ClusteredDistribution, the given partitioning expressions are among the clustering expressions (of the ClusteredDistribution) and they are semantically equal pair-wise
Otherwise, satisfies0 is negative (false).
satisfies0 is part of the Partitioning abstraction.
PartitionId Expression¶
partitionIdExpression: Expression
partitionIdExpression gives an Expression that produces a valid partition ID.
partitionIdExpression is a Pmod expression of a Murmur3Hash (with the partitioning expressions) and a Literal (with the number of partitions).
partitionIdExpression is used when:
BucketingUtilsutility is used togetBucketIdFromValueFileFormatWriterutility is used to write out a query result (with a bucketing spec)ShuffleExchangeExecutility is used to prepare a ShuffleDependency
Demo¶
val nums = spark.range(5)
val numParts = 200 // the default number of partitions
val partExprs = Seq(nums("id"))
val partitionIdExpression = pmod(hash(partExprs: _*), lit(numParts))
scala> partitionIdExpression.explain(extended = true)
pmod(hash(id#32L, 42), 200)
val q = nums.withColumn("partitionId", partitionIdExpression)
scala> q.show
+---+-----------+
| id|partitionId|
+---+-----------+
| 0| 5|
| 1| 69|
| 2| 128|
| 3| 107|
| 4| 140|
+---+-----------+