DataSource — Pluggable Data Provider Framework¶
DataSource paves the way for Pluggable Data Provider Framework in Spark SQL.
Together with the provider interfaces, DataSource allows Spark SQL integrators to use external data systems as data sources and sinks in structured queries in Spark SQL (incl. Spark Structured Streaming).
Provider Interfaces¶
- CreatableRelationProvider
- FileFormat
- RelationProvider
- SchemaRelationProvider
- StreamSinkProvider (Spark Structured Streaming)
- StreamSourceProvider (Spark Structured Streaming)
Accessing DataSource¶
DataSource is available using DataFrameReader.
val people = spark
.read // Batch reading
.format("csv")
.load("people.csv")
val messages = spark
.readStream // Streamed reading
.format("kafka")
.option("subscribe", "topic")
.option("kafka.bootstrap.servers", "localhost:9092")
.load
Creating Instance¶
DataSource takes the following to be created:
- SparkSession
- Fully-qualified class name or an alias of the data source provider (aka data source format)
- Data Paths (default: empty)
- User-specified schema (default: undefined)
- Names of the partition columns (default: empty)
- Bucketing specification (default: undefined)
- Options (default: empty)
- CatalogTable (default: undefined)
Note
Only a SparkSession and a fully-qualified class name of the data source provider are required to create an instance of DataSource.
DataSource is created when:
-
HiveMetastoreCatalogis requested to convert a HiveTableRelation to a LogicalRelation over a HadoopFsRelation -
DataFrameReaderis requested to load data from a data source (Data Source V1) -
DataFrameWriteris requested to save to a data source (Data Source V1) -
CreateDataSourceTableCommand, CreateDataSourceTableAsSelectCommand,
InsertIntoDataSourceDirCommand, CreateTempViewUsing commands are executed -
FindDataSourceTable and ResolveSQLOnFile logical evaluation rules are executed
-
For Spark Structured Streaming's
FileStreamSource,DataStreamReaderandDataStreamWriter
Data Source Resolution¶
DataSource is given an alias or a fully-qualified class name of the data source provider. DataSource uses the name to load the Java class. In the end, DataSource uses the Java class to resolve a relation to represent the data source in logical plans.
Resolving Relation¶
resolveRelation(
checkFilesExist: Boolean = true): BaseRelation
resolveRelation resolves (creates) a BaseRelation.
Internally, resolveRelation creates an instance of the class (of the provider) and branches off based on type and whether the user-defined schema was specified or not.
| Provider | Behaviour |
|---|---|
| SchemaRelationProvider | Executes SchemaRelationProvider.createRelation with the provided schema |
| RelationProvider | Executes RelationProvider.createRelation |
| FileFormat | Creates a HadoopFsRelation |
resolveRelation is used when:
-
DataSourceis requested to write and read the result of a structured query (only when the class is a FileFormat) -
DataFrameReaderis requested to load data from a data source that supports multiple paths -
TextInputCSVDataSourceandTextInputJsonDataSourceare requested to infer schema -
CreateDataSourceTableCommand logical command is executed
-
CreateTempViewUsing logical command is executed
-
FindDataSourceTableis requested to readDataSourceTable -
ResolveSQLOnFileis requested to convert a logical plan (when the class is a FileFormat) -
HiveMetastoreCatalogis requested to convert a HiveTableRelation to a LogicalRelation over a HadoopFsRelation
Creating Logical Command for Writing (for CreatableRelationProvider and FileFormat Data Sources)¶
planForWriting(
mode: SaveMode,
data: LogicalPlan): LogicalPlan
planForWriting creates an instance of the providingClass and branches off per type as follows:
-
For a CreatableRelationProvider,
planForWritingcreates a SaveIntoDataSourceCommand (with the inputdataandmodeand theCreatableRelationProviderdata source) -
For a FileFormat,
planForWritingplanForWritingFileFormat (with theFileFormatformat and the inputmodeanddata) -
For other types,
planForWritingsimply throws aRuntimeException:[providingClass] does not allow create table as select.
planForWriting is used when:
DataFrameWriteris requested to save (to a data source V1InsertIntoDataSourceDirCommandlogical command is executed
Writing Data to Data Source (per Save Mode) Followed by Reading Rows Back (as BaseRelation)¶
writeAndRead(
mode: SaveMode,
data: LogicalPlan,
outputColumnNames: Seq[String],
physicalPlan: SparkPlan): BaseRelation
writeAndRead...FIXME
Note
writeAndRead is also known as Create Table As Select (CTAS) query.
writeAndRead is used when CreateDataSourceTableAsSelectCommand logical command is executed.
Planning for Writing (to FileFormat-Based Data Source)¶
planForWritingFileFormat(
format: FileFormat,
mode: SaveMode,
data: LogicalPlan): InsertIntoHadoopFsRelationCommand
planForWritingFileFormat takes the paths and the path option (from the caseInsensitiveOptions) together and (assuming that there is only one path available among the paths combined) creates a fully-qualified HDFS-compatible output path for writing.
Note
planForWritingFileFormat uses Hadoop HDFS's Hadoop Path to requests for the FileSystem that owns it (using a Hadoop Configuration).
planForWritingFileFormat validates partition columns in the given partitionColumns.
In the end, planForWritingFileFormat returns a new InsertIntoHadoopFsRelationCommand.
planForWritingFileFormat throws an IllegalArgumentException when there are more than one path specified:
Expected exactly one path to be specified, but got: [allPaths]
planForWritingFileFormat is used when DataSource is requested for the following:
-
Writing data to a data source followed by "reading" rows back (for CreateDataSourceTableAsSelectCommand logical command)
-
Creating a logical command for writing (for
InsertIntoDataSourceDirCommandlogical command and DataFrameWriter.save operator with DataSource V1 data sources)
Data Source Class¶
providingClass: Class[_]
java.lang.Class that was loaded for the given data source provider
providingClass is used when:
InsertIntoDataSourceDirCommandlogical command is executed (to ensure working with a FileFormat-based data source)- ResolveSQLOnFile logical evaluation rule is executed (to ensure working with a FileFormat-based data source)
DataSourceis requested for providingInstance
Data Source Instance¶
providingInstance(): Any
providingInstance simply creates an instance of the Java class of the data source.
providingInstance is used when:
DataSourceis requested tosourceSchema,createSource,createSink, resolve a relation, write and read and plan for writing
Utilities¶
Looking up TableProvider¶
lookupDataSourceV2(
provider: String,
conf: SQLConf): Option[TableProvider]
lookupDataSourceV2 uses the spark.sql.sources.useV1SourceList configuration property for the data sources for which to use V1 version.
lookupDataSourceV2 loads up the class of the input provider.
lookupDataSourceV2 branches off based on the type of the data source and returns (in that order):
Nonefor a DataSourceRegister with the short name among the "useV1SourceList" data source names- A TableProvider when the canonical name of the class is not among the "useV1SourceList" data source names
Nonefor other cases
lookupDataSourceV2 is used when:
DataFrameReaderis requested to load a DataFrameDataFrameWriteris requested to look up a TableProvider- ResolveSessionCatalog logical extended resolution rule is executed
Loading Java Class Of Data Source Provider¶
lookupDataSource(
provider: String,
conf: SQLConf): Class[_]
lookupDataSource first finds the given provider in the backwardCompatibilityMap internal registry, and falls back to the provider name itself when not found.
Note
The provider argument can be either an alias (a simple name, e.g. parquet) or a fully-qualified class name (e.g. org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat).
lookupDataSource then uses the given SQLConf to decide on the class name of the provider for ORC and Avro data sources as follows:
-
For
orcprovider and native,lookupDataSourceuses...FIXME -
For
orcprovider and hive,lookupDataSourceusesorg.apache.spark.sql.hive.orc.OrcFileFormat -
For
com.databricks.spark.avroand spark.sql.legacy.replaceDatabricksSparkAvro.enabled configuration enabled (default),lookupDataSourceuses the built-in (but external) Avro data source module
lookupDataSource uses DefaultSource as the class name as another provider name variant (i.e. [provider1].DefaultSource).
lookupDataSource uses Java's ServiceLoader service-provider loading facility to find all data source providers of type DataSourceRegister on the Spark CLASSPATH.
lookupDataSource tries to find the DataSourceRegister provider classes (by their alias) that match the provider name (case-insensitive, e.g. parquet or kafka).
If a single DataSourceRegister provider class is found, lookupDataSource simply returns the instance of the data source provider.
If no DataSourceRegister provider class could be found by the short name (alias), lookupDataSource tries to load the provider name to be a fully-qualified class name. If not successful, lookupDataSource tries to load the other provider name (aka DefaultSource) instead.
Note
DataFrameWriter.format and DataFrameReader.format methods accept the name of the data source provider to use as an alias or a fully-qualified class name.
import org.apache.spark.sql.execution.datasources.DataSource
val source = "parquet"
val cls = DataSource.lookupDataSource(source, spark.sessionState.conf)
lookupDataSource is used when:
SparkSessionis requested to executeCommandCreateTableLikeCommandand AlterTableAddColumnsCommand runnable commands are executedDataSourceis requested for providingClass and to lookupDataSourceV2- PreprocessTableCreation posthoc logical resolution rule is executed
DataStreamReader(Spark Structured Streaming) is requested toloadDataStreamWriter(Spark Structured Streaming) is requested tostarta streaming query
Creating InMemoryFileIndex¶
createInMemoryFileIndex(
globbedPaths: Seq[Path]): InMemoryFileIndex
createInMemoryFileIndex creates an InMemoryFileIndex with the following:
| Property | Value |
|---|---|
| Root Paths | The given globbedPaths |
| Parameters | Options |
| User-defined schema | User-specified schema |
| FileStatusCache | FileStatusCache |
createInMemoryFileIndex is used when DataSource is requested for the following:
- Resolve a BaseRelation (for non-streaming FileFormat-based data sources)
- Source Schema (for FileFormat-based data sources)