Skip to content

HashPartitioning

HashPartitioning is a Catalyst Partitioning in which rows are distributed across partitions using the MurMur3 hash (of the partitioning expressions) modulo the number of partitions.

HashPartitioning is an Expression that computes the partition Id for data distribution to be consistent across shuffling and bucketing (for joins of bucketed and regular tables).

Creating Instance

HashPartitioning takes the following to be created:

HashPartitioning is created when:

Unevaluable

HashPartitioning is an Unevaluable expression.

Satisfying Distribution

satisfies0(
  required: Distribution): Boolean

satisfies0 is positive (true) when either the base satisfies0 holds or one of the given Distribution satisfies the following:

Otherwise, satisfies0 is negative (false).

satisfies0 is part of the Partitioning abstraction.

PartitionId Expression

partitionIdExpression: Expression

partitionIdExpression gives an Expression that produces a valid partition ID.

partitionIdExpression is a Pmod expression of a Murmur3Hash (with the partitioning expressions) and a Literal (with the number of partitions).

partitionIdExpression is used when:

Demo

val nums = spark.range(5)
val numParts = 200 // the default number of partitions
val partExprs = Seq(nums("id"))

val partitionIdExpression = pmod(hash(partExprs: _*), lit(numParts))
scala> partitionIdExpression.explain(extended = true)
pmod(hash(id#32L, 42), 200)

val q = nums.withColumn("partitionId", partitionIdExpression)
scala> q.show
+---+-----------+
| id|partitionId|
+---+-----------+
|  0|          5|
|  1|         69|
|  2|        128|
|  3|        107|
|  4|        140|
+---+-----------+