Skip to content

DeltaTable

DeltaTable is the management interface of a Delta table.

DeltaTable instances are created using utilities (e.g. DeltaTable.forName, DeltaTable.convertToDelta).

Utilities (Static Methods)

convertToDelta

convertToDelta(
  spark: SparkSession,
  identifier: String): DeltaTable
convertToDelta(
  spark: SparkSession,
  identifier: String,
  partitionSchema: String): DeltaTable
convertToDelta(
  spark: SparkSession,
  identifier: String,
  partitionSchema: StructType): DeltaTable

convertToDelta converts a parquet table to delta format (and makes the table available in Delta Lake).

Note

Refer to Demo: Converting Parquet Dataset Into Delta Format for a demo of DeltaTable.convertToDelta.

Internally, convertToDelta requests the SparkSession for the SQL parser (ParserInterface) that is in turn requested to parse the given table identifier (to get a TableIdentifier).

Tip

Read up on ParserInterface in The Internals of Spark SQL online book.

In the end, convertToDelta uses the DeltaConvert utility to convert the parquet table to delta format and creates a DeltaTable.

forName

forName(
  sparkSession: SparkSession,
  tableName: String): DeltaTable
forName(
  tableOrViewName: String): DeltaTable

forName uses ParserInterface (of the given SparkSession) to parse the given table name.

forName checks whether the given table name is of a Delta table and, if so, creates a DeltaTable with the following:

  • Dataset that represents loading data from the specified table name (using SparkSession.table operator)
  • DeltaLog of the specified table

forName throws an AnalysisException when the given table name is for non-Delta table:

[deltaTableIdentifier] is not a Delta table.

forName is used internally when DeltaConvert utility is used to executeConvert.

forPath

forPath(
  sparkSession: SparkSession,
  path: String): DeltaTable
forPath(
  path: String): DeltaTable

forPath creates a DeltaTable instance for data in the given directory (path) when the given directory is part of a delta table already (as the root or a child directory).

assert(spark.isInstanceOf[org.apache.spark.sql.SparkSession])

val tableId = "/tmp/delta-table/users"

import io.delta.tables.DeltaTable
assert(DeltaTable.isDeltaTable(tableId), s"$tableId should be a Delta table")

val dt = DeltaTable.forPath("delta-table")

forPath throws an AnalysisException when the given path does not belong to a delta table:

[deltaTableIdentifier] is not a Delta table.

Internally, forPath creates a new DeltaTable with the following:

forPath is used internally in DeltaTable.convertToDelta (via DeltaConvert utility).

isDeltaTable

isDeltaTable(
  sparkSession: SparkSession,
  identifier: String): Boolean
isDeltaTable(
  identifier: String): Boolean

isDeltaTable checks whether the provided identifier string is a file path that points to the root of a Delta table or one of the subdirectories.

Internally, isDeltaTable simply relays to DeltaTableUtils.isDeltaTable utility.

Creating Instance

DeltaTable takes the following to be created:

DeltaTable is created using DeltaTable.forPath or DeltaTable.forName utilities.

Operators

alias

alias(
  alias: String): DeltaTable

Applies an alias to the DeltaTable (equivalent to as)

as

as(
  alias: String): DeltaTable

Applies an alias to the DeltaTable

delete

delete(): Unit
delete(
  condition: Column): Unit
delete(
  condition: String): Unit

Deletes data from the DeltaTable that matches the given condition.

generate

generate(
  mode: String): Unit

Generates a manifest for the delta table

generate executeGenerate with the table ID of the format ++delta.path++ (where the path is the data directory of the DeltaLog) and the given mode.

history

history(): DataFrame
history(
  limit: Int): DataFrame

Gets available commits (history) of the DeltaTable

merge

merge(
  source: DataFrame,
  condition: Column): DeltaMergeBuilder
merge(
  source: DataFrame,
  condition: String): DeltaMergeBuilder

Creates a DeltaMergeBuilder

toDF

toDF: Dataset[Row]

Returns the DataFrame representation of the DeltaTable

update

update(
  condition: Column,
  set: Map[String, Column]): Unit
update(
  set: Map[String, Column]): Unit

Updates data in the DeltaTable on the rows that match the given condition based on the rules defined by set

updateExpr

updateExpr(
  set: Map[String, String]): Unit
updateExpr(
  condition: String,
  set: Map[String, String]): Unit

Updates data in the DeltaTable on the rows that match the given condition based on the rules defined by set

vacuum

vacuum(): DataFrame
vacuum(
  retentionHours: Double): DataFrame

Deletes files and directories (recursively) in the DeltaTable that are not needed by the table (and maintains older versions up to the given retention threshold).

vacuum executes vacuum command.

Demo

import org.apache.spark.sql.SparkSession
assert(spark.isInstanceOf[SparkSession])

val path = "/tmp/delta/t1"

// random data to create a delta table from scratch
val data = spark.range(5)
data.write.format("delta").save(path)

import io.delta.tables.DeltaTable
val dt = DeltaTable.forPath(spark, path)

val history = dt.history.select('version, 'timestamp, 'operation, 'operationParameters, 'isBlindAppend)
scala> history.show(truncate = false)
+-------+-------------------+---------+------------------------------------------+-------------+
|version|timestamp          |operation|operationParameters                       |isBlindAppend|
+-------+-------------------+---------+------------------------------------------+-------------+
|0      |2019-12-23 22:24:40|WRITE    |[mode -> ErrorIfExists, partitionBy -> []]|true         |
+-------+-------------------+---------+------------------------------------------+-------------+

Last update: 2020-09-29