Skip to content

DataSource — Pluggable Data Provider Framework

DataSource paves the way for Pluggable Data Provider Framework in Spark SQL.

Together with the provider interfaces, DataSource allows Spark SQL integrators to use external data systems as data sources and sinks in structured queries in Spark SQL (incl. Spark Structured Streaming).

Provider Interfaces

Accessing DataSource

DataSource is available using DataFrameReader.

val people = spark
  .read // Batch reading
  .format("csv")
  .load("people.csv")
val messages = spark
  .readStream // Streamed reading
  .format("kafka")
  .option("subscribe", "topic")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .load

Creating Instance

DataSource takes the following to be created:

  • SparkSession
  • Fully-qualified class name or an alias of the data source provider (aka data source format)
  • Data Paths (default: empty)
  • User-specified schema (default: undefined)
  • Names of the partition columns (default: empty)
  • Bucketing specification (default: undefined)
  • Options (default: empty)
  • CatalogTable (default: undefined)

Note

Only a SparkSession and a fully-qualified class name of the data source provider are required to create an instance of DataSource.

DataSource is created when:

Data Source Resolution

DataSource is given an alias or a fully-qualified class name of the data source provider. DataSource uses the name to load the Java class. In the end, DataSource uses the Java class to resolve a relation to represent the data source in logical plans.

Resolving Relation

resolveRelation(
  checkFilesExist: Boolean = true): BaseRelation

resolveRelation resolves (creates) a BaseRelation.

Internally, resolveRelation creates an instance of the class (of the provider) and branches off based on type and whether the user-defined schema was specified or not.

Provider Behaviour
SchemaRelationProvider Executes SchemaRelationProvider.createRelation with the provided schema
RelationProvider Executes RelationProvider.createRelation
FileFormat Creates a HadoopFsRelation

resolveRelation is used when:

Creating Logical Command for Writing (for CreatableRelationProvider and FileFormat Data Sources)

planForWriting(
  mode: SaveMode,
  data: LogicalPlan): LogicalPlan

planForWriting creates an instance of the providingClass and branches off per type as follows:

planForWriting is used when:

Writing Data to Data Source (per Save Mode) Followed by Reading Rows Back (as BaseRelation)

writeAndRead(
  mode: SaveMode,
  data: LogicalPlan,
  outputColumnNames: Seq[String],
  physicalPlan: SparkPlan): BaseRelation

writeAndRead...FIXME

Note

writeAndRead is also known as Create Table As Select (CTAS) query.

writeAndRead is used when CreateDataSourceTableAsSelectCommand logical command is executed.

Planning for Writing (to FileFormat-Based Data Source)

planForWritingFileFormat(
  format: FileFormat,
  mode: SaveMode,
  data: LogicalPlan): InsertIntoHadoopFsRelationCommand

planForWritingFileFormat takes the paths and the path option (from the caseInsensitiveOptions) together and (assuming that there is only one path available among the paths combined) creates a fully-qualified HDFS-compatible output path for writing.

Note

planForWritingFileFormat uses Hadoop HDFS's Hadoop Path to requests for the FileSystem that owns it (using a Hadoop Configuration).

planForWritingFileFormat validates partition columns in the given partitionColumns.

In the end, planForWritingFileFormat returns a new InsertIntoHadoopFsRelationCommand.

planForWritingFileFormat throws an IllegalArgumentException when there are more than one path specified:

Expected exactly one path to be specified, but got: [allPaths]

planForWritingFileFormat is used when DataSource is requested for the following:

Data Source Class

providingClass: Class[_]

java.lang.Class that was loaded for the given data source provider

providingClass is used when:

  • InsertIntoDataSourceDirCommand logical command is executed (to ensure working with a FileFormat-based data source)
  • ResolveSQLOnFile logical evaluation rule is executed (to ensure working with a FileFormat-based data source)
  • DataSource is requested for providingInstance

Data Source Instance

providingInstance(): Any

providingInstance simply creates an instance of the Java class of the data source.

providingInstance is used when:

Utilities

Looking up TableProvider

lookupDataSourceV2(
  provider: String,
  conf: SQLConf): Option[TableProvider]

lookupDataSourceV2 uses the spark.sql.sources.useV1SourceList configuration property for the data sources for which to use V1 version.

lookupDataSourceV2 loads up the class of the input provider.

lookupDataSourceV2 branches off based on the type of the data source and returns (in that order):

  1. None for a DataSourceRegister with the short name among the "useV1SourceList" data source names
  2. A TableProvider when the canonical name of the class is not among the "useV1SourceList" data source names
  3. None for other cases

lookupDataSourceV2 is used when:

Loading Java Class Of Data Source Provider

lookupDataSource(
  provider: String,
  conf: SQLConf): Class[_]

lookupDataSource first finds the given provider in the backwardCompatibilityMap internal registry, and falls back to the provider name itself when not found.

Note

The provider argument can be either an alias (a simple name, e.g. parquet) or a fully-qualified class name (e.g. org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat).

lookupDataSource then uses the given SQLConf to decide on the class name of the provider for ORC and Avro data sources as follows:

lookupDataSource uses DefaultSource as the class name as another provider name variant (i.e. [provider1].DefaultSource).

lookupDataSource uses Java's ServiceLoader service-provider loading facility to find all data source providers of type DataSourceRegister on the Spark CLASSPATH.

lookupDataSource tries to find the DataSourceRegister provider classes (by their alias) that match the provider name (case-insensitive, e.g. parquet or kafka).

If a single DataSourceRegister provider class is found, lookupDataSource simply returns the instance of the data source provider.

If no DataSourceRegister provider class could be found by the short name (alias), lookupDataSource tries to load the provider name to be a fully-qualified class name. If not successful, lookupDataSource tries to load the other provider name (aka DefaultSource) instead.

Note

DataFrameWriter.format and DataFrameReader.format methods accept the name of the data source provider to use as an alias or a fully-qualified class name.

import org.apache.spark.sql.execution.datasources.DataSource
val source = "parquet"
val cls = DataSource.lookupDataSource(source, spark.sessionState.conf)

lookupDataSource is used when:

Creating InMemoryFileIndex

createInMemoryFileIndex(
  globbedPaths: Seq[Path]): InMemoryFileIndex

createInMemoryFileIndex creates an InMemoryFileIndex with the following:

Property Value
Root Paths The given globbedPaths
Parameters Options
User-defined schema User-specified schema
FileStatusCache FileStatusCache

createInMemoryFileIndex is used when DataSource is requested for the following: