DataSource — Pluggable Data Provider Framework¶
DataSource
paves the way for Pluggable Data Provider Framework in Spark SQL.
Together with the provider interfaces, DataSource
allows Spark SQL integrators to use external data systems as data sources and sinks in structured queries in Spark SQL (incl. Spark Structured Streaming).
Provider Interfaces¶
- CreatableRelationProvider
- FileFormat
- RelationProvider
- SchemaRelationProvider
- StreamSinkProvider (Spark Structured Streaming)
- StreamSourceProvider (Spark Structured Streaming)
Accessing DataSource¶
DataSource
is available using DataFrameReader.
val people = spark
.read // Batch reading
.format("csv")
.load("people.csv")
val messages = spark
.readStream // Streamed reading
.format("kafka")
.option("subscribe", "topic")
.option("kafka.bootstrap.servers", "localhost:9092")
.load
Creating Instance¶
DataSource
takes the following to be created:
- SparkSession
- Fully-qualified class name or an alias of the data source provider (aka data source format)
- Data Paths (default: empty)
- User-specified schema (default: undefined)
- Names of the partition columns (default: empty)
- Bucketing specification (default: undefined)
- Options (default: empty)
- CatalogTable (default: undefined)
Note
Only a SparkSession and a fully-qualified class name of the data source provider are required to create an instance of DataSource
.
DataSource
is created when:
-
HiveMetastoreCatalog
is requested to convert a HiveTableRelation to a LogicalRelation over a HadoopFsRelation -
DataFrameReader
is requested to load data from a data source (Data Source V1) -
DataFrameWriter
is requested to save to a data source (Data Source V1) -
CreateDataSourceTableCommand, CreateDataSourceTableAsSelectCommand,
InsertIntoDataSourceDirCommand
, CreateTempViewUsing commands are executed -
FindDataSourceTable and ResolveSQLOnFile logical evaluation rules are executed
-
For Spark Structured Streaming's
FileStreamSource
,DataStreamReader
andDataStreamWriter
Data Source Resolution¶
DataSource
is given an alias or a fully-qualified class name of the data source provider. DataSource
uses the name to load the Java class. In the end, DataSource
uses the Java class to resolve a relation to represent the data source in logical plans.
Resolving Relation¶
resolveRelation(
checkFilesExist: Boolean = true): BaseRelation
resolveRelation
resolves (creates) a BaseRelation.
Internally, resolveRelation
creates an instance of the class (of the provider) and branches off based on type and whether the user-defined schema was specified or not.
Provider | Behaviour |
---|---|
SchemaRelationProvider | Executes SchemaRelationProvider.createRelation with the provided schema |
RelationProvider | Executes RelationProvider.createRelation |
FileFormat | Creates a HadoopFsRelation |
resolveRelation
is used when:
-
DataSource
is requested to write and read the result of a structured query (only when the class is a FileFormat) -
DataFrameReader
is requested to load data from a data source that supports multiple paths -
TextInputCSVDataSource
andTextInputJsonDataSource
are requested to infer schema -
CreateDataSourceTableCommand logical command is executed
-
CreateTempViewUsing logical command is executed
-
FindDataSourceTable
is requested to readDataSourceTable -
ResolveSQLOnFile
is requested to convert a logical plan (when the class is a FileFormat) -
HiveMetastoreCatalog
is requested to convert a HiveTableRelation to a LogicalRelation over a HadoopFsRelation
Creating Logical Command for Writing (for CreatableRelationProvider and FileFormat Data Sources)¶
planForWriting(
mode: SaveMode,
data: LogicalPlan): LogicalPlan
planForWriting
creates an instance of the providingClass and branches off per type as follows:
-
For a CreatableRelationProvider,
planForWriting
creates a SaveIntoDataSourceCommand (with the inputdata
andmode
and theCreatableRelationProvider
data source) -
For a FileFormat,
planForWriting
planForWritingFileFormat (with theFileFormat
format and the inputmode
anddata
) -
For other types,
planForWriting
simply throws aRuntimeException
:[providingClass] does not allow create table as select.
planForWriting
is used when:
DataFrameWriter
is requested to save (to a data source V1InsertIntoDataSourceDirCommand
logical command is executed
Writing Data to Data Source (per Save Mode) Followed by Reading Rows Back (as BaseRelation)¶
writeAndRead(
mode: SaveMode,
data: LogicalPlan,
outputColumnNames: Seq[String],
physicalPlan: SparkPlan): BaseRelation
writeAndRead
...FIXME
Note
writeAndRead
is also known as Create Table As Select (CTAS) query.
writeAndRead
is used when CreateDataSourceTableAsSelectCommand logical command is executed.
Planning for Writing (to FileFormat-Based Data Source)¶
planForWritingFileFormat(
format: FileFormat,
mode: SaveMode,
data: LogicalPlan): InsertIntoHadoopFsRelationCommand
planForWritingFileFormat
takes the paths and the path
option (from the caseInsensitiveOptions) together and (assuming that there is only one path available among the paths combined) creates a fully-qualified HDFS-compatible output path for writing.
Note
planForWritingFileFormat
uses Hadoop HDFS's Hadoop Path to requests for the FileSystem that owns it (using a Hadoop Configuration).
planForWritingFileFormat
validates partition columns in the given partitionColumns.
In the end, planForWritingFileFormat
returns a new InsertIntoHadoopFsRelationCommand.
planForWritingFileFormat
throws an IllegalArgumentException
when there are more than one path specified:
Expected exactly one path to be specified, but got: [allPaths]
planForWritingFileFormat
is used when DataSource
is requested for the following:
-
Writing data to a data source followed by "reading" rows back (for CreateDataSourceTableAsSelectCommand logical command)
-
Creating a logical command for writing (for
InsertIntoDataSourceDirCommand
logical command and DataFrameWriter.save operator with DataSource V1 data sources)
Data Source Class¶
providingClass: Class[_]
java.lang.Class that was loaded for the given data source provider
providingClass
is used when:
InsertIntoDataSourceDirCommand
logical command is executed (to ensure working with a FileFormat-based data source)- ResolveSQLOnFile logical evaluation rule is executed (to ensure working with a FileFormat-based data source)
DataSource
is requested for providingInstance
Data Source Instance¶
providingInstance(): Any
providingInstance
simply creates an instance of the Java class of the data source.
providingInstance
is used when:
DataSource
is requested tosourceSchema
,createSource
,createSink
, resolve a relation, write and read and plan for writing
Utilities¶
Looking up TableProvider¶
lookupDataSourceV2(
provider: String,
conf: SQLConf): Option[TableProvider]
lookupDataSourceV2
uses the spark.sql.sources.useV1SourceList configuration property for the data sources for which to use V1 version.
lookupDataSourceV2
loads up the class of the input provider
.
lookupDataSourceV2
branches off based on the type of the data source and returns (in that order):
None
for a DataSourceRegister with the short name among the "useV1SourceList" data source names- A TableProvider when the canonical name of the class is not among the "useV1SourceList" data source names
None
for other cases
lookupDataSourceV2
is used when:
DataFrameReader
is requested to load a DataFrameDataFrameWriter
is requested to look up a TableProvider- ResolveSessionCatalog logical extended resolution rule is executed
Loading Java Class Of Data Source Provider¶
lookupDataSource(
provider: String,
conf: SQLConf): Class[_]
lookupDataSource
first finds the given provider
in the backwardCompatibilityMap internal registry, and falls back to the provider
name itself when not found.
Note
The provider
argument can be either an alias (a simple name, e.g. parquet
) or a fully-qualified class name (e.g. org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
).
lookupDataSource
then uses the given SQLConf to decide on the class name of the provider for ORC and Avro data sources as follows:
-
For
orc
provider and native,lookupDataSource
uses...FIXME -
For
orc
provider and hive,lookupDataSource
usesorg.apache.spark.sql.hive.orc.OrcFileFormat
-
For
com.databricks.spark.avro
and spark.sql.legacy.replaceDatabricksSparkAvro.enabled configuration enabled (default),lookupDataSource
uses the built-in (but external) Avro data source module
lookupDataSource
uses DefaultSource
as the class name as another provider name variant (i.e. [provider1].DefaultSource
).
lookupDataSource
uses Java's ServiceLoader service-provider loading facility to find all data source providers of type DataSourceRegister on the Spark CLASSPATH.
lookupDataSource
tries to find the DataSourceRegister
provider classes (by their alias) that match the provider name (case-insensitive, e.g. parquet
or kafka
).
If a single DataSourceRegister
provider class is found, lookupDataSource
simply returns the instance of the data source provider.
If no DataSourceRegister
provider class could be found by the short name (alias), lookupDataSource
tries to load the provider name to be a fully-qualified class name. If not successful, lookupDataSource
tries to load the other provider name (aka DefaultSource) instead.
Note
DataFrameWriter.format and DataFrameReader.format methods accept the name of the data source provider to use as an alias or a fully-qualified class name.
import org.apache.spark.sql.execution.datasources.DataSource
val source = "parquet"
val cls = DataSource.lookupDataSource(source, spark.sessionState.conf)
lookupDataSource
is used when:
SparkSession
is requested to executeCommandCreateTableLikeCommand
and AlterTableAddColumnsCommand runnable commands are executedDataSource
is requested for providingClass and to lookupDataSourceV2- PreprocessTableCreation posthoc logical resolution rule is executed
DataStreamReader
(Spark Structured Streaming) is requested toload
DataStreamWriter
(Spark Structured Streaming) is requested tostart
a streaming query
Creating InMemoryFileIndex¶
createInMemoryFileIndex(
globbedPaths: Seq[Path]): InMemoryFileIndex
createInMemoryFileIndex
creates an InMemoryFileIndex with the following:
Property | Value |
---|---|
Root Paths | The given globbedPaths |
Parameters | Options |
User-defined schema | User-specified schema |
FileStatusCache | FileStatusCache |
createInMemoryFileIndex
is used when DataSource
is requested for the following:
- Resolve a BaseRelation (for non-streaming FileFormat-based data sources)
- Source Schema (for FileFormat-based data sources)