Skip to content

SpaceFillingCurveClustering

SpaceFillingCurveClustering is an extension of the MultiDimClustering abstraction for space filling curve based clustering algorithms.

Contract

ClusteringExpression

getClusteringExpression(
  cols: Seq[Column],
  numRanges: Int): Column

See:

Used when:

Implementations

Multi-Dimensional Clustering

MultiDimClustering
cluster(
  df: DataFrame,
  colNames: Seq[String],
  approxNumPartitions: Int,
  randomizationExpressionOpt: Option[Column]): DataFrame

cluster is part of the MultiDimClustering abstraction.

cluster converts the given colNames into Columns (using the given df dataframe).

cluster adds two column expressions (to the given DataFrame):

  1. [randomUUID]-rpKey1 for a clustering expression (with the colNames and the spark.databricks.io.skipping.mdc.rangeId.max configuration property)
  2. [randomUUID]-rpKey2 for an extra noise (for an independent and identically distributed samples uniformly distributed in [0.0, 1.0) using rand standard function)

cluster uses rpKey2 column only with spark.databricks.io.skipping.mdc.addNoise enabled.

cluster repartitions the given DataFrame by the rpKey1 and rpKey2 partitioning expressions into the approxNumPartitions partitions (using Dataset.repartitionByRange operator).

In the end, cluster returns the repartitioned DataFrame (with the two columns to be dropped).

Demo

import org.apache.spark.sql.delta.skipping.ZOrderClustering
val df = spark.range(5).toDF
val colNames = "id" :: Nil
val approxNumPartitions = 3
val repartByRangeDf = ZOrderClustering.cluster(df, colNames, approxNumPartitions)
println(repartByRangeDf.queryExecution.executedPlan.numberedTreeString)
00 AdaptiveSparkPlan isFinalPlan=false
01 +- Project [id#0L]
02    +- Exchange rangepartitioning(1075d2d7-0cd9-4ce2-be11-ff345ff45047-rpKey1#3 ASC NULLS FIRST, f311bca4-4b5a-4bd8-b9e6-5fb05570c8e1-rpKey2#6 ASC NULLS FIRST, 3), REPARTITION_BY_NUM, [id=#21]
03       +- Project [id#0L, cast(interleavebits(partitionerexpr(id#0L, org.apache.spark.RangePartitioner@442452be)) as string) AS 1075d2d7-0cd9-4ce2-be11-ff345ff45047-rpKey1#3, cast(((rand(5298108963717326846) * 255.0) - 128.0) as tinyint) AS f311bca4-4b5a-4bd8-b9e6-5fb05570c8e1-rpKey2#6]
04          +- Range (0, 5, step=1, splits=16)
assert(repartByRangeDf.rdd.getNumPartitions == 3)

I'm sure that the demo begs for some more love but let's explore the data in the parquet data files anyway. The dataset is very basic (yet I'm hoping someone will take it from there).

repartByRangeDf.write.format("delta").save("/tmp/zorder")
$ tree /tmp/zorder
/tmp/zorder
├── _delta_log
│   └── 00000000000000000000.json
├── part-00000-5677c9b7-7439-4365-8233-8f0e6184dcf3-c000.snappy.parquet
├── part-00001-9dfb628f-c225-4d23-acb8-0946f0c3617d-c000.snappy.parquet
└── part-00002-e1172c78-4d47-4947-bd37-f67c72701062-c000.snappy.parquet
scala> spark.read.load("/tmp/zorder/part-00000-*").show
+---+
| id|
+---+
|  0|
|  1|
+---+


scala> spark.read.load("/tmp/zorder/part-00001-*").show
+---+
| id|
+---+
|  2|
|  3|
+---+


scala> spark.read.load("/tmp/zorder/part-00002-*").show
+---+
| id|
+---+
|  4|
+---+