SpaceFillingCurveClustering¶
SpaceFillingCurveClustering is an extension of the MultiDimClustering abstraction for space filling curve based clustering algorithms.
Contract¶
ClusteringExpression¶
getClusteringExpression(
cols: Seq[Column],
numRanges: Int): Column
See:
Used when:
SpaceFillingCurveClusteringis requested to execute multi-dimensional clustering
Implementations¶
Multi-Dimensional Clustering¶
MultiDimClustering
cluster(
df: DataFrame,
colNames: Seq[String],
approxNumPartitions: Int,
randomizationExpressionOpt: Option[Column]): DataFrame
cluster is part of the MultiDimClustering abstraction.
cluster converts the given colNames into Columns (using the given df dataframe).
cluster adds two column expressions (to the given DataFrame):
[randomUUID]-rpKey1for a clustering expression (with thecolNamesand the spark.databricks.io.skipping.mdc.rangeId.max configuration property)[randomUUID]-rpKey2for an extra noise (for an independent and identically distributed samples uniformly distributed in[0.0, 1.0)usingrandstandard function)
cluster uses rpKey2 column only with spark.databricks.io.skipping.mdc.addNoise enabled.
cluster repartitions the given DataFrame by the rpKey1 and rpKey2 partitioning expressions into the approxNumPartitions partitions (using Dataset.repartitionByRange operator).
In the end, cluster returns the repartitioned DataFrame (with the two columns to be dropped).
Demo¶
import org.apache.spark.sql.delta.skipping.ZOrderClustering
val df = spark.range(5).toDF
val colNames = "id" :: Nil
val approxNumPartitions = 3
val repartByRangeDf = ZOrderClustering.cluster(df, colNames, approxNumPartitions)
println(repartByRangeDf.queryExecution.executedPlan.numberedTreeString)
00 AdaptiveSparkPlan isFinalPlan=false
01 +- Project [id#0L]
02 +- Exchange rangepartitioning(1075d2d7-0cd9-4ce2-be11-ff345ff45047-rpKey1#3 ASC NULLS FIRST, f311bca4-4b5a-4bd8-b9e6-5fb05570c8e1-rpKey2#6 ASC NULLS FIRST, 3), REPARTITION_BY_NUM, [id=#21]
03 +- Project [id#0L, cast(interleavebits(partitionerexpr(id#0L, org.apache.spark.RangePartitioner@442452be)) as string) AS 1075d2d7-0cd9-4ce2-be11-ff345ff45047-rpKey1#3, cast(((rand(5298108963717326846) * 255.0) - 128.0) as tinyint) AS f311bca4-4b5a-4bd8-b9e6-5fb05570c8e1-rpKey2#6]
04 +- Range (0, 5, step=1, splits=16)
assert(repartByRangeDf.rdd.getNumPartitions == 3)
I'm sure that the demo begs for some more love but let's explore the data in the parquet data files anyway. The dataset is very basic (yet I'm hoping someone will take it from there).
repartByRangeDf.write.format("delta").save("/tmp/zorder")
$ tree /tmp/zorder
/tmp/zorder
├── _delta_log
│ └── 00000000000000000000.json
├── part-00000-5677c9b7-7439-4365-8233-8f0e6184dcf3-c000.snappy.parquet
├── part-00001-9dfb628f-c225-4d23-acb8-0946f0c3617d-c000.snappy.parquet
└── part-00002-e1172c78-4d47-4947-bd37-f67c72701062-c000.snappy.parquet
scala> spark.read.load("/tmp/zorder/part-00000-*").show
+---+
| id|
+---+
| 0|
| 1|
+---+
scala> spark.read.load("/tmp/zorder/part-00001-*").show
+---+
| id|
+---+
| 2|
| 3|
+---+
scala> spark.read.load("/tmp/zorder/part-00002-*").show
+---+
| id|
+---+
| 4|
+---+