DeltaDataSource¶
DeltaDataSource ties Delta Lake with Spark SQL (and Spark Structured Streaming) together as delta data source that supports batch and streaming queries.
DataSourceRegister and delta Alias¶
DeltaDataSource is a DataSourceRegister (Spark SQL) and registers delta alias.
DeltaDataSource is registered using META-INF/services/org.apache.spark.sql.sources.DataSourceRegister:
org.apache.spark.sql.delta.sources.DeltaDataSource
RelationProvider¶
DeltaDataSource is a RelationProvider (Spark SQL).
Creating BaseRelation for Table Scan¶
createRelation(
sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation
createRelation is part of the RelationProvider (Spark SQL) abstraction.
createRelation verifies the given parameters (options).
createRelation extracts time travel specification (from the given parameters).
createRelation collects CDF-specific options with change data feed enabled:
- readChangeFeed (with
truevalue) - startingVersion
- startingTimestamp
- endingVersion
- endingTimestamp
createRelation creates a DeltaTableV2 (with the given parameters as options when spark.databricks.delta.loadFileSystemConfigsFromDataFrameOptions configuration property is enabled).
In the end, createRelation requests the DeltaTableV2 for an insertable HadoopFsRelation.
path Parameter is Required
createRelation makes sure that there is path parameter defined (in the given parameters) or throws an IllegalArgumentException:
'path' is not specified
CreatableRelationProvider¶
DeltaDataSource is a CreatableRelationProvider (Spark SQL).
Creating BaseRelation after Data Writing¶
createRelation(
sqlContext: SQLContext,
mode: SaveMode,
parameters: Map[String, String],
data: DataFrame): BaseRelation
createRelation is part of the CreatableRelationProvider (Spark SQL) abstraction.
createRelation creates a DeltaLog for the required path parameter (from the given parameters) and the given parameters itself.
createSource creates a DeltaOptions (with the given parameters and the current SQLConf).
createSource validateConfigurations (with delta.-prefixed keys in the givenparameters).
createRelation creates and executes a WriteIntoDelta command with the given data.
In the end, createRelation requests the DeltaLog for a BaseRelation.
path Parameter is Required
createRelation makes sure that there is path parameter defined (in the given parameters) or throws an IllegalArgumentException:
'path' is not specified
StreamSourceProvider¶
DeltaDataSource is a StreamSourceProvider (Spark Structured Streaming).
Creating DeltaSource¶
createSource(
sqlContext: SQLContext,
metadataPath: String,
schema: Option[StructType],
providerName: String,
parameters: Map[String, String]): Source
createSource creates a DeltaLog for the required path parameter (from the given parameters).
createSource creates a DeltaOptions (with the given parameters and the current SQLConf).
In the end, createSource creates a DeltaSource (with the DeltaLog and the DeltaOptions).
createSource makes sure that there is path parameter defined (in the given parameters) or throws an IllegalArgumentException:
'path' is not specified
createSource makes sure that there is no schema specified or throws an AnalysisException:
Delta does not support specifying the schema at read time.
createSource makes sure that there is schema available (in the Snapshot) of the DeltaLog or throws an AnalysisException:
Table schema is not set. Write data into it or use CREATE TABLE to set the schema.
createSource is part of the StreamSourceProvider (Spark Structured Streaming) abstraction.
Streaming Source Schema¶
sourceSchema(
sqlContext: SQLContext,
schema: Option[StructType],
providerName: String,
parameters: Map[String, String]): (String, StructType)
sourceSchema creates a DeltaLog for the required path parameter (from the given parameters).
sourceSchema takes the schema (of the Snapshot) of the DeltaLog and removes default expressions.
In the end, sourceSchema returns the delta name with the table schema.
createSource makes sure that there is no schema specified or throws an AnalysisException:
Delta does not support specifying the schema at read time.
createSource makes sure that there is path parameter defined (in the given parameters) or throws an IllegalArgumentException:
'path' is not specified
createSource makes sure that there is no time travel specified using the following:
If either is set, createSource throws an AnalysisException:
Cannot time travel views, subqueries or streams.
sourceSchema is part of the StreamSourceProvider (Spark Structured Streaming) abstraction.
StreamSinkProvider¶
DeltaDataSource is a StreamSinkProvider (Spark Structured Streaming).
DeltaDataSource supports Append and Complete output modes only.
Tip
Consult the demo Using Delta Lake (as Streaming Sink) in Streaming Queries.
Creating Streaming Sink¶
createSink(
sqlContext: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode): Sink
createSink creates a DeltaOptions (with the given parameters and the current SQLConf).
In the end, createSink creates a DeltaSink (with the required path parameter, the given partitionColumns and the DeltaOptions).
createSink makes sure that there is path parameter defined (in the given parameters) or throws an IllegalArgumentException:
'path' is not specified
createSink makes sure that the given outputMode is either Append or Complete, or throws an IllegalArgumentException:
Data source [dataSource] does not support [outputMode] output mode
createSink is part of the StreamSinkProvider (Spark Structured Streaming) abstraction.
TableProvider¶
DeltaDataSource is a TableProvider (Spark SQL).
Loading Delta Table¶
TableProvider
getTable(
schema: StructType,
partitioning: Array[Transform],
properties: Map[String, String]): Table
getTable is part of the TableProvider (Spark SQL) abstraction.
getTable creates a DeltaTableV2 (with the path from the given properties).
getTable throws an IllegalArgumentException when path option is not specified:
'path' is not specified
Schema Tracking Location¶
extractSchemaTrackingLocationConfig(
spark: SparkSession,
parameters: Map[String, String]): Option[String]
extractSchemaTrackingLocationConfig is the value of the following options (if defined):
extractSchemaTrackingLocationConfig is used when:
DeltaAnalysisis requested to verifyDeltaSourceSchemaLocationDeltaDataSourceis requested for a DeltaSourceMetadataTrackingLog
Creating DeltaTimeTravelSpec¶
getTimeTravelVersion(
parameters: Map[String, String]): Option[DeltaTimeTravelSpec]
getTimeTravelVersion reads the following options (from the given parameters):
- timestampAsOf
- versionAsOf
__time_travel_source__
getTimeTravelVersion creates a DeltaTimeTravelSpec if either timestampAsOf or versionAsOf is defined. The DeltaTimeTravelSpec is created with the creationSource based on __time_travel_source__ (if specified) or defaults to dfReader.
Undocumented Feature
__time_travel_source__ looks like an undocumented feature to use for the creationSource.
getTimeTravelVersion is used when:
DeltaDataSourceis requested to create a relation (as a RelationProvider)
parsePathIdentifier¶
parsePathIdentifier(
spark: SparkSession,
userPath: String): (Path, Seq[(String, String)], Option[DeltaTimeTravelSpec])
parsePathIdentifier...FIXME
parsePathIdentifier is used when:
DeltaTableV2is requested for metadata (for a non-catalog table)
readChangeFeed¶
getMetadataTrackingLogForDeltaSource¶
getMetadataTrackingLogForDeltaSource(
spark: SparkSession,
sourceSnapshot: Snapshot,
parameters: Map[String, String],
sourceMetadataPathOpt: Option[String] = None,
mergeConsecutiveSchemaChanges: Boolean = false): Option[DeltaSourceMetadataTrackingLog]
getMetadataTrackingLogForDeltaSource finds the schema tracking location and, if defined, creates a DeltaSourceMetadataTrackingLog (with the value of streamingSourceTrackingId option).
getMetadataTrackingLogForDeltaSource reports an UnsupportedOperationException for spark.databricks.delta.streaming.schemaTracking.enabled disabled:
Schema tracking location is not supported for Delta streaming source
getMetadataTrackingLogForDeltaSource is used when:
DeltaDataSourceis requested for a streaming source and the streaming source schema