FileScanRDD¶
FileScanRDD is the input RDD of FileSourceScanExec leaf physical operator (for Whole-Stage Java Code Generation).
RDD
Find out more on RDD abstraction in The Internals of Apache Spark.
Creating Instance¶
FileScanRDD takes the following to be created:
- SparkSession
- Read Function of PartitionedFiles to InternalRows (
(PartitionedFile) => Iterator[InternalRow]) - FilePartitions
- Read Schema
- Metadata Columns
FileScanRDD is created when:
- FileSourceScanExec physical operator is requested to createBucketedReadRDD and createNonBucketedReadRDD (when
FileSourceScanExecoperator is requested for the input RDD when WholeStageCodegenExec physical operator is executed)
Configuration Properties¶
FileScanRDD uses the following properties (when requested to compute a partition):
FilePartition¶
FileScanRDD is given FilePartitions when created that are custom RDD partitions with PartitionedFiles (file blocks).
Placement Preferences of Partition (Preferred Locations)¶
getPreferredLocations(
split: RDDPartition): Seq[String]
getPreferredLocations is part of Spark Core's RDD abstraction.
getPreferredLocations assumes that the given RDDPartition is actually a FilePartition and requests it for preferredLocations.
RDD Partitions¶
getPartitions: Array[RDDPartition]
getPartitions is part of Spark Core's RDD abstraction.
getPartitions simply returns the FilePartitions.
Computing Partition¶
compute(
split: RDDPartition,
context: TaskContext): Iterator[InternalRow]
Note
The RDDPartition given is actually a FilePartition with one or more PartitionedFiles (that getPartitions returned).
compute is part of Spark Core's RDD abstraction.
Retrieving Next Element¶
next(): Object
next takes the next element of the current iterator over elements of a file block (PartitionedFile).
next increments the metrics of bytes and number of rows read (that could be the number of rows in a ColumnarBatch for vectorized reads).
next is part of Scala's Iterator abstraction.
Demo¶
val q = spark.read.text("README.md")
val sparkPlan = q.queryExecution.executedPlan
scala> println(sparkPlan.numberedTreeString)
00 FileScan text [value#0] Batched: false, DataFilters: [], Format: Text, Location: InMemoryFileIndex[file:/Users/jacek/dev/oss/spark/README.md], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<value:string>
import org.apache.spark.sql.execution.FileSourceScanExec
val scan = sparkPlan.collectFirst { case exec: FileSourceScanExec => exec }.get
val inputRDD = scan.inputRDDs.head
import org.apache.spark.sql.execution.datasources.FileScanRDD
assert(inputRDD.isInstanceOf[FileScanRDD])
val rdd = scan.execute
scala> println(rdd.toDebugString)
(1) MapPartitionsRDD[1] at execute at <console>:27 []
| FileScanRDD[0] at inputRDDs at <console>:26 []
val fileScanRDD = rdd.dependencies.head.rdd
assert(fileScanRDD.isInstanceOf[FileScanRDD])
Logging¶
Enable ALL logging level for org.apache.spark.sql.execution.datasources.FileScanRDD logger to see what happens inside.
Add the following line to conf/log4j2.properties:
log4j.logger.org.apache.spark.sql.execution.datasources.FileScanRDD=ALL
Refer to Logging.