DeltaDataSource¶
DeltaDataSource
ties Delta Lake with Spark SQL (and Spark Structured Streaming) together as delta data source that supports batch and streaming queries.
DataSourceRegister and delta Alias¶
DeltaDataSource
is a DataSourceRegister
(Spark SQL) and registers delta alias.
DeltaDataSource
is registered using META-INF/services/org.apache.spark.sql.sources.DataSourceRegister:
org.apache.spark.sql.delta.sources.DeltaDataSource
RelationProvider¶
DeltaDataSource
is a RelationProvider
(Spark SQL).
Creating BaseRelation for Table Scan¶
createRelation(
sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation
createRelation
is part of the RelationProvider
(Spark SQL) abstraction.
createRelation
verifies the given parameters (options).
createRelation
extracts time travel specification (from the given parameters
).
createRelation
collects CDF-specific options with change data feed enabled:
- readChangeFeed (with
true
value) - startingVersion
- startingTimestamp
- endingVersion
- endingTimestamp
createRelation
creates a DeltaTableV2 (with the given parameters
as options when spark.databricks.delta.loadFileSystemConfigsFromDataFrameOptions configuration property is enabled).
In the end, createRelation
requests the DeltaTableV2
for an insertable HadoopFsRelation.
path
Parameter is Required
createRelation
makes sure that there is path
parameter defined (in the given parameters
) or throws an IllegalArgumentException
:
'path' is not specified
CreatableRelationProvider¶
DeltaDataSource
is a CreatableRelationProvider
(Spark SQL).
Creating BaseRelation after Data Writing¶
createRelation(
sqlContext: SQLContext,
mode: SaveMode,
parameters: Map[String, String],
data: DataFrame): BaseRelation
createRelation
is part of the CreatableRelationProvider
(Spark SQL) abstraction.
createRelation
creates a DeltaLog for the required path
parameter (from the given parameters
) and the given parameters
itself.
createSource
creates a DeltaOptions (with the given parameters
and the current SQLConf
).
createSource
validateConfigurations (with delta.
-prefixed keys in the givenparameters
).
createRelation
creates and executes a WriteIntoDelta command with the given data
.
In the end, createRelation
requests the DeltaLog
for a BaseRelation.
path
Parameter is Required
createRelation
makes sure that there is path
parameter defined (in the given parameters
) or throws an IllegalArgumentException
:
'path' is not specified
StreamSourceProvider¶
DeltaDataSource
is a StreamSourceProvider
(Spark Structured Streaming).
Creating DeltaSource¶
createSource(
sqlContext: SQLContext,
metadataPath: String,
schema: Option[StructType],
providerName: String,
parameters: Map[String, String]): Source
createSource
creates a DeltaLog for the required path
parameter (from the given parameters
).
createSource
creates a DeltaOptions (with the given parameters
and the current SQLConf
).
In the end, createSource
creates a DeltaSource (with the DeltaLog
and the DeltaOptions
).
createSource
makes sure that there is path
parameter defined (in the given parameters
) or throws an IllegalArgumentException
:
'path' is not specified
createSource
makes sure that there is no schema
specified or throws an AnalysisException
:
Delta does not support specifying the schema at read time.
createSource
makes sure that there is schema available (in the Snapshot) of the DeltaLog
or throws an AnalysisException
:
Table schema is not set. Write data into it or use CREATE TABLE to set the schema.
createSource
is part of the StreamSourceProvider
(Spark Structured Streaming) abstraction.
Streaming Source Schema¶
sourceSchema(
sqlContext: SQLContext,
schema: Option[StructType],
providerName: String,
parameters: Map[String, String]): (String, StructType)
sourceSchema
creates a DeltaLog for the required path
parameter (from the given parameters
).
sourceSchema
takes the schema (of the Snapshot) of the DeltaLog
and removes default expressions.
In the end, sourceSchema
returns the delta name with the table schema.
createSource
makes sure that there is no schema
specified or throws an AnalysisException
:
Delta does not support specifying the schema at read time.
createSource
makes sure that there is path
parameter defined (in the given parameters
) or throws an IllegalArgumentException
:
'path' is not specified
createSource
makes sure that there is no time travel specified using the following:
If either is set, createSource
throws an AnalysisException
:
Cannot time travel views, subqueries or streams.
sourceSchema
is part of the StreamSourceProvider
(Spark Structured Streaming) abstraction.
StreamSinkProvider¶
DeltaDataSource
is a StreamSinkProvider
(Spark Structured Streaming).
DeltaDataSource
supports Append
and Complete
output modes only.
Tip
Consult the demo Using Delta Lake (as Streaming Sink) in Streaming Queries.
Creating Streaming Sink¶
createSink(
sqlContext: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode): Sink
createSink
creates a DeltaOptions (with the given parameters
and the current SQLConf
).
In the end, createSink
creates a DeltaSink (with the required path
parameter, the given partitionColumns
and the DeltaOptions
).
createSink
makes sure that there is path
parameter defined (in the given parameters
) or throws an IllegalArgumentException
:
'path' is not specified
createSink
makes sure that the given outputMode
is either Append
or Complete
, or throws an IllegalArgumentException
:
Data source [dataSource] does not support [outputMode] output mode
createSink
is part of the StreamSinkProvider
(Spark Structured Streaming) abstraction.
TableProvider¶
DeltaDataSource
is a TableProvider
(Spark SQL).
Loading Delta Table¶
TableProvider
getTable(
schema: StructType,
partitioning: Array[Transform],
properties: Map[String, String]): Table
getTable
is part of the TableProvider
(Spark SQL) abstraction.
getTable
creates a DeltaTableV2 (with the path from the given properties
).
getTable
throws an IllegalArgumentException
when path
option is not specified:
'path' is not specified
Schema Tracking Location¶
extractSchemaTrackingLocationConfig(
spark: SparkSession,
parameters: Map[String, String]): Option[String]
extractSchemaTrackingLocationConfig
is the value of the following options (if defined):
extractSchemaTrackingLocationConfig
is used when:
DeltaAnalysis
is requested to verifyDeltaSourceSchemaLocationDeltaDataSource
is requested for a DeltaSourceMetadataTrackingLog
Creating DeltaTimeTravelSpec¶
getTimeTravelVersion(
parameters: Map[String, String]): Option[DeltaTimeTravelSpec]
getTimeTravelVersion
reads the following options (from the given parameters
):
- timestampAsOf
- versionAsOf
__time_travel_source__
getTimeTravelVersion
creates a DeltaTimeTravelSpec if either timestampAsOf
or versionAsOf
is defined. The DeltaTimeTravelSpec
is created with the creationSource based on __time_travel_source__
(if specified) or defaults to dfReader
.
Undocumented Feature
__time_travel_source__
looks like an undocumented feature to use for the creationSource.
getTimeTravelVersion
is used when:
DeltaDataSource
is requested to create a relation (as a RelationProvider)
parsePathIdentifier¶
parsePathIdentifier(
spark: SparkSession,
userPath: String): (Path, Seq[(String, String)], Option[DeltaTimeTravelSpec])
parsePathIdentifier
...FIXME
parsePathIdentifier
is used when:
DeltaTableV2
is requested for metadata (for a non-catalog table)
readChangeFeed¶
getMetadataTrackingLogForDeltaSource¶
getMetadataTrackingLogForDeltaSource(
spark: SparkSession,
sourceSnapshot: Snapshot,
parameters: Map[String, String],
sourceMetadataPathOpt: Option[String] = None,
mergeConsecutiveSchemaChanges: Boolean = false): Option[DeltaSourceMetadataTrackingLog]
getMetadataTrackingLogForDeltaSource
finds the schema tracking location and, if defined, creates a DeltaSourceMetadataTrackingLog (with the value of streamingSourceTrackingId option).
getMetadataTrackingLogForDeltaSource
reports an UnsupportedOperationException
for spark.databricks.delta.streaming.schemaTracking.enabled disabled:
Schema tracking location is not supported for Delta streaming source
getMetadataTrackingLogForDeltaSource
is used when:
DeltaDataSource
is requested for a streaming source and the streaming source schema