JDBCRelation¶
JDBCRelation
is a BaseRelation with support for column pruning with filter pushdown and inserting or overwriting data.
PrunedFilteredScan¶
JDBCRelation
is an PrunedFilteredScan and supports supports column pruning with filter pushdown.
InsertableRelation¶
JDBCRelation
is an InsertableRelation and supports inserting or overwriting data.
Creating Instance¶
JDBCRelation
takes the following to be created:
- StructType
-
Partition
s - JDBCOptions
- SparkSession
JDBCRelation
is created (possibly using apply) when:
DataFrameReader
is requested to jdbcJDBCRelation
utility is used to applyJdbcRelationProvider
is requested to create a BaseRelation (for reading)JDBCScanBuilder
is requested to build a Scan
Creating JDBCRelation¶
apply(
parts: Array[Partition],
jdbcOptions: JDBCOptions)(
sparkSession: SparkSession): JDBCRelation
apply
gets the schema (establishing a connection to the database system directly) and creates a JDBCRelation.
getSchema¶
getSchema(
resolver: Resolver,
jdbcOptions: JDBCOptions): StructType
getSchema
resolves the table (from the given JDBCOptions).
With the customSchema option specified, getSchema
gets the custom schema (based on the table schema from the database system). Otherwise, getSchema
returns the table schema from the database system.
getSchema
is used when:
JDBCRelation
utility is used to create a JDBCRelationJdbcRelationProvider
is requested to create a BaseRelation (for reading)
columnPartition¶
columnPartition(
schema: StructType,
resolver: Resolver,
timeZoneId: String,
jdbcOptions: JDBCOptions): Array[Partition]
columnPartition
...FIXME
In the end, columnPartition
prints out the following INFO message to the logs:
Number of partitions: [numPartitions], WHERE clauses of these partitions:
[whereClause]
columnPartition
is used when:
JdbcRelationProvider
is requested to create a BaseRelation (for reading)JDBCScanBuilder
is requested to build a Scan
Logging¶
Enable ALL
logging level for org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation
logger to see what happens inside.
Add the following line to conf/log4j2.properties
:
log4j.logger.org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation=ALL
Refer to Logging.
Review Me¶
[[toString]] When requested for a human-friendly text representation, JDBCRelation
requests the <
JDBCRelation([table]) [numPartitions=[number]]
scala> df.explain
== Physical Plan ==
*Scan JDBCRelation(projects) [numPartitions=1] [id#0,name#1,website#2] ReadSchema: struct<id:int,name:string,website:string>
[[needConversion]] JDBCRelation
turns the needConversion flag off (to announce that <RDD[InternalRow]
already and DataSourceStrategy
execution planning strategy does not have to do the RDD conversion).
=== [[unhandledFilters]] Finding Unhandled Filter Predicates -- unhandledFilters
Method
[source, scala]¶
unhandledFilters(filters: Array[Filter]): Array[Filter]¶
unhandledFilters
is part of BaseRelation abstraction.
unhandledFilters
returns the Filter predicates in the input filters
that could not be converted to a SQL expression (and are therefore unhandled by the JDBC data source natively).
=== [[schema]] Schema of Tuples (Data) -- schema
Property
[source, scala]¶
schema: StructType¶
schema
uses JDBCRDD
to resolveTable given the JDBCOptions (that simply returns the schema of the table, also known as the default table schema).
If customSchema JDBC option was defined, schema
uses JdbcUtils
to replace the data types in the default table schema.
schema
is part of BaseRelation abstraction.
=== [[insert]] Inserting or Overwriting Data to JDBC Table -- insert
Method
[source, scala]¶
insert(data: DataFrame, overwrite: Boolean): Unit¶
insert
is part of the InsertableRelation abstraction.
insert
simply requests the input DataFrame
for a <
insert
also requests the DataFrameWriter
to set the save mode as Overwrite or Append per the input overwrite
flag.
Note
insert
uses a "trick" to reuse a code that is responsible for saving data to a JDBC table.
=== [[buildScan]] Building Distributed Data Scan with Column Pruning and Filter Pushdown -- buildScan
Method
[source, scala]¶
buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row]¶
buildScan
is part of the PrunedFilteredScan abstraction.
buildScan
uses the JDBCRDD
object to create a RDD[Row] for a distributed data scan.