DeltaTable — Management Interface Of Delta Tables

DeltaTable instances are created for existing Delta tables using DeltaTable.forPath utility (static method).

import org.apache.spark.sql.SparkSession
assert(spark.isInstanceOf[SparkSession])

val path = "/tmp/delta/t1"

// random data to create a delta table from scratch
val data = spark.range(5)
data.write.format("delta").save(path)

import io.delta.tables.DeltaTable
val dt = DeltaTable.forPath(spark, path)

val history = dt.history.select('version, 'timestamp, 'operation, 'operationParameters, 'isBlindAppend)
scala> history.show(truncate = false)
+-------+-------------------+---------+------------------------------------------+-------------+
|version|timestamp          |operation|operationParameters                       |isBlindAppend|
+-------+-------------------+---------+------------------------------------------+-------------+
|0      |2019-12-23 22:24:40|WRITE    |[mode -> ErrorIfExists, partitionBy -> []]|true         |
+-------+-------------------+---------+------------------------------------------+-------------+

You can convert parquet tables to delta format (by importing the tables into Delta Lake) using DeltaTable.convertToDelta utility (static method).

You can check whether a directory is part of a delta table using DeltaTable.isDeltaTable utility (static method).

Table 1. DeltaTable API / Operators
Name Description

alias

alias(alias: String): DeltaTable

Applies an alias to the DeltaTable (equivalent to as)

as

as(alias: String): DeltaTable

Applies an alias to the DeltaTable (equivalent to alias)

delete

delete(): Unit (1)
delete(condition: Column): Unit
delete(condition: String): Unit
1 Undefined condition

Deletes data from the DeltaTable that matches the given condition.

delete and update operators do not support subqueries (and throw an AnalysisException otherwise).

generate

generate(mode: String): Unit

Generates a manifest for the delta table

Internally, generate executeGenerate with the table ID of the format delta.`path` (where the path is the data directory of the DeltaLog) and the given mode.

history

history(): DataFrame (1)
history(
  limit: Int): DataFrame
1 Unlimited history

Gets available commits (history) on the DeltaTable

merge

merge(
  source: DataFrame,
  condition: Column): DeltaMergeBuilder
merge(
  source: DataFrame,
  condition: String): DeltaMergeBuilder

Creates a DeltaMergeBuilder to specify how to merge the data from the source DataFrame into this delta table based on the condition.

toDF

toDF: Dataset[Row]

Gets the DataFrame representation of the DeltaTable

update

update(
  condition: Column,
  set: Map[String, Column]): Unit
update(
  condition: Column,
  set: java.util.Map[String, Column]): Unit
update(
  set: Map[String, Column]): Unit
update(
  set: java.util.Map[String, Column]): Unit

Updates data in the DeltaTable on the rows that match the given condition based on the rules defined by set

delete and update operators do not support subqueries (and throw an AnalysisException otherwise).

updateExpr

updateExpr(
  set: Map[String, String]): Unit
updateExpr(
  set: java.util.Map[String, String]): Unit
updateExpr(
  condition: String,
  set: Map[String, String]): Unit
updateExpr(
  condition: String,
  set: java.util.Map[String, String]): Unit

Updates data in the DeltaTable on the rows that match the given condition based on the rules defined by set

vacuum

vacuum(): DataFrame (1)
vacuum(retentionHours: Double): DataFrame
1 Undefined retention threshold

Deletes files and directories (recursively) in the DeltaTable that are not needed by the table (and maintains older versions up to the given retention threshold).

Internally, vacuum simply executes vacuum command.

Creating DeltaTable Instance

DeltaTable takes the following to be created:

  • Distributed structured query to access table data (Dataset[Row])

  • DeltaLog

You should create new DeltaTable instances using DeltaTable.forPath and DeltaTable.convertToDelta utilities.

Creating DeltaTable — DeltaTable.forPath Utility

forPath(
  path: String): DeltaTable (1)
forPath(
  sparkSession: SparkSession,
  path: String): DeltaTable
1 Uses SparkSession.getActiveSession to access the SparkSession

forPath creates a DeltaTable instance for data in the given directory (path) when the given directory is part of a delta table already (as the root or a child directory).

assert(spark.isInstanceOf[org.apache.spark.sql.SparkSession])

val tableId = "/tmp/delta-table/users"

import io.delta.tables.DeltaTable
assert(DeltaTable.isDeltaTable(tableId), s"$tableId should be a Delta table")

val dt = DeltaTable.forPath("delta-table")

forPath throws an AnalysisException when the given path does not belong to a delta table:

[deltaTableIdentifier] is not a Delta table.

Internally, forPath creates a new DeltaTable with the following:

forPath is a public API, and is also used internally for DeltaTable.convertToDelta (via DeltaConvert utility).

Converting Parquet Table To Delta Format (Importing Parquet Table Into Delta Lake) — DeltaTable.convertToDelta Utility

convertToDelta(
  spark: SparkSession,
  identifier: String,
  partitionSchema: StructType): DeltaTable
convertToDelta(
  spark: SparkSession,
  identifier: String,
  partitionSchema: String): DeltaTable  (1)
convertToDelta(
  spark: SparkSession,
  identifier: String): DeltaTable
1 Creates StructType from the given DDL-formatted partitionSchema string

convertToDelta converts a parquet table to delta format (and makes the table available in Delta Lake).

Refer to Demo: Converting Parquet Dataset Into Delta Format for a demo of DeltaTable.convertToDelta.

Internally, convertToDelta requests the SparkSession for the SQL parser (ParserInterface) that is in turn requested to parse the given table identifier (to get a TableIdentifier).

Read up on ParserInterface in The Internals of Spark SQL online book.

In the end, convertToDelta uses the DeltaConvert utility to convert the parquet table to delta format and creates a DeltaTable.

Checking Out Whether Directory Is Part of Delta Table — isDeltaTable Utility

isDeltaTable(
  identifier: String): Boolean
isDeltaTable(
  sparkSession: SparkSession,
  identifier: String): Boolean

isDeltaTable checks whether or not the provided identifier string is a file path that points to the root of a Delta table or one of the subdirectories.

Internally, isDeltaTable simply relays to DeltaTableUtils.isDeltaTable utility.

unapply Extractor Utility

unapply(
  a: LogicalRelation): Option[TahoeFileIndex]

unapply simply destructures the given LogicalRelation and takes out the TahoeFileIndex from the HadoopFsRelation relation.

unapply is used when:

  • DeltaSink is requested to addBatch

  • DeltaDataSource utility is used to extractDeltaPath (but does not seem to be used whatsoever)