Skip to content

DeltaDataSource

DeltaDataSource ties Delta Lake with Spark SQL (and Spark Structured Streaming) together as delta data source that supports batch and streaming queries.

DataSourceRegister and delta Alias

DeltaDataSource is a DataSourceRegister (Spark SQL) and registers delta alias.

DeltaDataSource is registered using META-INF/services/org.apache.spark.sql.sources.DataSourceRegister:

org.apache.spark.sql.delta.sources.DeltaDataSource

RelationProvider

DeltaDataSource is a RelationProvider (Spark SQL).

Creating BaseRelation for Table Scan

createRelation(
  sqlContext: SQLContext,
  parameters: Map[String, String]): BaseRelation

createRelation is part of the RelationProvider (Spark SQL) abstraction.


createRelation verifies the given parameters (options).

createRelation extracts time travel specification (from the given parameters).

createRelation collects CDF-specific options with change data feed enabled:

createRelation creates a DeltaTableV2 (with the given parameters as options when spark.databricks.delta.loadFileSystemConfigsFromDataFrameOptions configuration property is enabled).

In the end, createRelation requests the DeltaTableV2 for an insertable HadoopFsRelation.

path Parameter is Required

createRelation makes sure that there is path parameter defined (in the given parameters) or throws an IllegalArgumentException:

'path' is not specified

CreatableRelationProvider

DeltaDataSource is a CreatableRelationProvider (Spark SQL).

Creating BaseRelation after Data Writing

createRelation(
  sqlContext: SQLContext,
  mode: SaveMode,
  parameters: Map[String, String],
  data: DataFrame): BaseRelation

createRelation is part of the CreatableRelationProvider (Spark SQL) abstraction.


createRelation creates a DeltaLog for the required path parameter (from the given parameters) and the given parameters itself.

createSource creates a DeltaOptions (with the given parameters and the current SQLConf).

createSource validateConfigurations (with delta.-prefixed keys in the givenparameters).

createRelation creates and executes a WriteIntoDelta command with the given data.

In the end, createRelation requests the DeltaLog for a BaseRelation.

path Parameter is Required

createRelation makes sure that there is path parameter defined (in the given parameters) or throws an IllegalArgumentException:

'path' is not specified

StreamSourceProvider

DeltaDataSource is a StreamSourceProvider (Spark Structured Streaming).

Creating DeltaSource

createSource(
  sqlContext: SQLContext,
  metadataPath: String,
  schema: Option[StructType],
  providerName: String,
  parameters: Map[String, String]): Source

createSource creates a DeltaLog for the required path parameter (from the given parameters).

createSource creates a DeltaOptions (with the given parameters and the current SQLConf).

In the end, createSource creates a DeltaSource (with the DeltaLog and the DeltaOptions).


createSource makes sure that there is path parameter defined (in the given parameters) or throws an IllegalArgumentException:

'path' is not specified

createSource makes sure that there is no schema specified or throws an AnalysisException:

Delta does not support specifying the schema at read time.

createSource makes sure that there is schema available (in the Snapshot) of the DeltaLog or throws an AnalysisException:

Table schema is not set.  Write data into it or use CREATE TABLE to set the schema.

createSource is part of the StreamSourceProvider (Spark Structured Streaming) abstraction.

Streaming Source Schema

sourceSchema(
  sqlContext: SQLContext,
  schema: Option[StructType],
  providerName: String,
  parameters: Map[String, String]): (String, StructType)

sourceSchema creates a DeltaLog for the required path parameter (from the given parameters).

sourceSchema takes the schema (of the Snapshot) of the DeltaLog and removes default expressions.

In the end, sourceSchema returns the delta name with the table schema.


createSource makes sure that there is no schema specified or throws an AnalysisException:

Delta does not support specifying the schema at read time.

createSource makes sure that there is path parameter defined (in the given parameters) or throws an IllegalArgumentException:

'path' is not specified

createSource makes sure that there is no time travel specified using the following:

If either is set, createSource throws an AnalysisException:

Cannot time travel views, subqueries or streams.

sourceSchema is part of the StreamSourceProvider (Spark Structured Streaming) abstraction.

StreamSinkProvider

DeltaDataSource is a StreamSinkProvider (Spark Structured Streaming).

DeltaDataSource supports Append and Complete output modes only.

Creating Streaming Sink

createSink(
  sqlContext: SQLContext,
  parameters: Map[String, String],
  partitionColumns: Seq[String],
  outputMode: OutputMode): Sink

createSink creates a DeltaOptions (with the given parameters and the current SQLConf).

In the end, createSink creates a DeltaSink (with the required path parameter, the given partitionColumns and the DeltaOptions).


createSink makes sure that there is path parameter defined (in the given parameters) or throws an IllegalArgumentException:

'path' is not specified

createSink makes sure that the given outputMode is either Append or Complete, or throws an IllegalArgumentException:

Data source [dataSource] does not support [outputMode] output mode

createSink is part of the StreamSinkProvider (Spark Structured Streaming) abstraction.

TableProvider

DeltaDataSource is a TableProvider (Spark SQL).

Loading Delta Table

TableProvider
getTable(
  schema: StructType,
  partitioning: Array[Transform],
  properties: Map[String, String]): Table

getTable is part of the TableProvider (Spark SQL) abstraction.

getTable creates a DeltaTableV2 (with the path from the given properties).


getTable throws an IllegalArgumentException when path option is not specified:

'path' is not specified

Schema Tracking Location

extractSchemaTrackingLocationConfig(
  spark: SparkSession,
  parameters: Map[String, String]): Option[String]

extractSchemaTrackingLocationConfig is the value of the following options (if defined):


extractSchemaTrackingLocationConfig is used when:

Creating DeltaTimeTravelSpec

getTimeTravelVersion(
  parameters: Map[String, String]): Option[DeltaTimeTravelSpec]

getTimeTravelVersion reads the following options (from the given parameters):

getTimeTravelVersion creates a DeltaTimeTravelSpec if either timestampAsOf or versionAsOf is defined. The DeltaTimeTravelSpec is created with the creationSource based on __time_travel_source__ (if specified) or defaults to dfReader.

Undocumented Feature

__time_travel_source__ looks like an undocumented feature to use for the creationSource.


getTimeTravelVersion is used when:

parsePathIdentifier

parsePathIdentifier(
  spark: SparkSession,
  userPath: String): (Path, Seq[(String, String)], Option[DeltaTimeTravelSpec])

parsePathIdentifier...FIXME


parsePathIdentifier is used when:

  • DeltaTableV2 is requested for metadata (for a non-catalog table)

readChangeFeed

readChangeFeed

getMetadataTrackingLogForDeltaSource

getMetadataTrackingLogForDeltaSource(
  spark: SparkSession,
  sourceSnapshot: Snapshot,
  parameters: Map[String, String],
  sourceMetadataPathOpt: Option[String] = None,
  mergeConsecutiveSchemaChanges: Boolean = false): Option[DeltaSourceMetadataTrackingLog]

getMetadataTrackingLogForDeltaSource finds the schema tracking location and, if defined, creates a DeltaSourceMetadataTrackingLog (with the value of streamingSourceTrackingId option).


getMetadataTrackingLogForDeltaSource reports an UnsupportedOperationException for spark.databricks.delta.streaming.schemaTracking.enabled disabled:

Schema tracking location is not supported for Delta streaming source

getMetadataTrackingLogForDeltaSource is used when: