HashPartitioning¶
HashPartitioning
is a Catalyst Partitioning in which rows are distributed across partitions using the MurMur3 hash (of the partitioning expressions) modulo the number of partitions.
HashPartitioning
is an Expression that computes the partition Id for data distribution to be consistent across shuffling and bucketing (for joins of bucketed and regular tables).
Creating Instance¶
HashPartitioning
takes the following to be created:
- Partitioning Expressions
- Number of partitions
HashPartitioning
is created when:
RepartitionByExpression
is requested for the partitioningRebalancePartitions
is requested for the partitioningClusteredDistribution
is requested to create a PartitioningHashClusteredDistribution
is requested to create a PartitioningFileSourceScanExec
physical operator is requested for the output partitioningBucketingUtils
utility is used togetBucketIdFromValue
FileFormatWriter
utility is used to write out a query result (with a bucketing spec)BroadcastHashJoinExec
physical operator is requested to expandOutputPartitioning
Unevaluable¶
HashPartitioning
is an Unevaluable expression.
Satisfying Distribution¶
satisfies0(
required: Distribution): Boolean
satisfies0
is positive (true
) when either the base satisfies0 holds or one of the given Distribution satisfies the following:
-
For a HashClusteredDistribution, the number of the given partitioning expressions and the HashClusteredDistribution's are the same and semantically equal pair-wise
-
For a ClusteredDistribution, the given partitioning expressions are among the clustering expressions (of the ClusteredDistribution) and they are semantically equal pair-wise
Otherwise, satisfies0
is negative (false
).
satisfies0
is part of the Partitioning abstraction.
PartitionId Expression¶
partitionIdExpression: Expression
partitionIdExpression
gives an Expression that produces a valid partition ID.
partitionIdExpression
is a Pmod
expression of a Murmur3Hash (with the partitioning expressions) and a Literal (with the number of partitions).
partitionIdExpression
is used when:
BucketingUtils
utility is used togetBucketIdFromValue
FileFormatWriter
utility is used to write out a query result (with a bucketing spec)ShuffleExchangeExec
utility is used to prepare a ShuffleDependency
Demo¶
val nums = spark.range(5)
val numParts = 200 // the default number of partitions
val partExprs = Seq(nums("id"))
val partitionIdExpression = pmod(hash(partExprs: _*), lit(numParts))
scala> partitionIdExpression.explain(extended = true)
pmod(hash(id#32L, 42), 200)
val q = nums.withColumn("partitionId", partitionIdExpression)
scala> q.show
+---+-----------+
| id|partitionId|
+---+-----------+
| 0| 5|
| 1| 69|
| 2| 128|
| 3| 107|
| 4| 140|
+---+-----------+