Skip to content

FileScanRDD

FileScanRDD is the input RDD of FileSourceScanExec leaf physical operator (for Whole-Stage Java Code Generation).

RDD

Find out more on RDD abstraction in The Internals of Apache Spark.

Creating Instance

FileScanRDD takes the following to be created:

FileScanRDD is created when:

Configuration Properties

FileScanRDD uses the following properties (when requested to compute a partition):

FilePartition

FileScanRDD is given FilePartitions when created that are custom RDD partitions with PartitionedFiles (file blocks).

Placement Preferences of Partition (Preferred Locations)

getPreferredLocations(
  split: RDDPartition): Seq[String]

getPreferredLocations is part of Spark Core's RDD abstraction.


getPreferredLocations assumes that the given RDDPartition is actually a FilePartition and requests it for preferredLocations.

RDD Partitions

getPartitions: Array[RDDPartition]

getPartitions is part of Spark Core's RDD abstraction.


getPartitions simply returns the FilePartitions.

Computing Partition

compute(
  split: RDDPartition,
  context: TaskContext): Iterator[InternalRow]

Note

The RDDPartition given is actually a FilePartition with one or more PartitionedFiles (that getPartitions returned).

compute is part of Spark Core's RDD abstraction.

Retrieving Next Element

next(): Object

next takes the next element of the current iterator over elements of a file block (PartitionedFile).

next increments the metrics of bytes and number of rows read (that could be the number of rows in a ColumnarBatch for vectorized reads).

next is part of Scala's Iterator abstraction.

Demo

val q = spark.read.text("README.md")
val sparkPlan = q.queryExecution.executedPlan
scala> println(sparkPlan.numberedTreeString)
00 FileScan text [value#0] Batched: false, DataFilters: [], Format: Text, Location: InMemoryFileIndex[file:/Users/jacek/dev/oss/spark/README.md], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<value:string>

import org.apache.spark.sql.execution.FileSourceScanExec
val scan = sparkPlan.collectFirst { case exec: FileSourceScanExec => exec }.get
val inputRDD = scan.inputRDDs.head

import org.apache.spark.sql.execution.datasources.FileScanRDD
assert(inputRDD.isInstanceOf[FileScanRDD])

val rdd = scan.execute
scala> println(rdd.toDebugString)
(1) MapPartitionsRDD[1] at execute at <console>:27 []
 |  FileScanRDD[0] at inputRDDs at <console>:26 []

val fileScanRDD = rdd.dependencies.head.rdd
assert(fileScanRDD.isInstanceOf[FileScanRDD])

Logging

Enable ALL logging level for org.apache.spark.sql.execution.datasources.FileScanRDD logger to see what happens inside.

Add the following line to conf/log4j2.properties:

log4j.logger.org.apache.spark.sql.execution.datasources.FileScanRDD=ALL

Refer to Logging.