FileScanRDD¶
FileScanRDD
is the input RDD of FileSourceScanExec leaf physical operator (for Whole-Stage Java Code Generation).
RDD
Find out more on RDD
abstraction in The Internals of Apache Spark.
Creating Instance¶
FileScanRDD
takes the following to be created:
- SparkSession
- Read Function of PartitionedFiles to InternalRows (
(PartitionedFile) => Iterator[InternalRow]
) - FilePartitions
- Read Schema
- Metadata Columns
FileScanRDD
is created when:
- FileSourceScanExec physical operator is requested to createBucketedReadRDD and createNonBucketedReadRDD (when
FileSourceScanExec
operator is requested for the input RDD when WholeStageCodegenExec physical operator is executed)
Configuration Properties¶
FileScanRDD
uses the following properties (when requested to compute a partition):
FilePartition¶
FileScanRDD
is given FilePartitions when created that are custom RDD partitions with PartitionedFiles (file blocks).
Placement Preferences of Partition (Preferred Locations)¶
getPreferredLocations(
split: RDDPartition): Seq[String]
getPreferredLocations
is part of Spark Core's RDD
abstraction.
getPreferredLocations
assumes that the given RDDPartition
is actually a FilePartition and requests it for preferredLocations
.
RDD Partitions¶
getPartitions: Array[RDDPartition]
getPartitions
is part of Spark Core's RDD
abstraction.
getPartitions
simply returns the FilePartitions.
Computing Partition¶
compute(
split: RDDPartition,
context: TaskContext): Iterator[InternalRow]
Note
The RDDPartition
given is actually a FilePartition with one or more PartitionedFiles (that getPartitions returned).
compute
is part of Spark Core's RDD
abstraction.
Retrieving Next Element¶
next(): Object
next
takes the next element of the current iterator over elements of a file block (PartitionedFile).
next
increments the metrics of bytes and number of rows read (that could be the number of rows in a ColumnarBatch for vectorized reads).
next
is part of Scala's Iterator abstraction.
Demo¶
val q = spark.read.text("README.md")
val sparkPlan = q.queryExecution.executedPlan
scala> println(sparkPlan.numberedTreeString)
00 FileScan text [value#0] Batched: false, DataFilters: [], Format: Text, Location: InMemoryFileIndex[file:/Users/jacek/dev/oss/spark/README.md], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<value:string>
import org.apache.spark.sql.execution.FileSourceScanExec
val scan = sparkPlan.collectFirst { case exec: FileSourceScanExec => exec }.get
val inputRDD = scan.inputRDDs.head
import org.apache.spark.sql.execution.datasources.FileScanRDD
assert(inputRDD.isInstanceOf[FileScanRDD])
val rdd = scan.execute
scala> println(rdd.toDebugString)
(1) MapPartitionsRDD[1] at execute at <console>:27 []
| FileScanRDD[0] at inputRDDs at <console>:26 []
val fileScanRDD = rdd.dependencies.head.rdd
assert(fileScanRDD.isInstanceOf[FileScanRDD])
Logging¶
Enable ALL
logging level for org.apache.spark.sql.execution.datasources.FileScanRDD
logger to see what happens inside.
Add the following line to conf/log4j2.properties
:
log4j.logger.org.apache.spark.sql.execution.datasources.FileScanRDD=ALL
Refer to Logging.