Skip to content

FileSourceScanExec Physical Operator

FileSourceScanExec is a DataSourceScanExec that represents a scan over a HadoopFsRelation.

Creating Instance

FileSourceScanExec takes the following to be created:

FileSourceScanExec is created when:

Data Filters

FileSourceScanExec is given Data Filters (Expressions) when created.

The Data Filters are data columns of the HadoopFsRelation (that are not partition columns that are part of Partition Filters) in FileSourceStrategy execution planning strategy.

Partition Filters

FileSourceScanExec is given Partition Filters (Expressions) when created.

The Partition Filters are the PushedDownFilters (based on the partition columns of the HadoopFsRelation) in FileSourceStrategy execution planning strategy.

Node Name Prefix

Signature
nodeNamePrefix: String

nodeNamePrefix is part of the DataSourceScanExec abstraction.

nodeNamePrefix is always File.

val fileScanExec: FileSourceScanExec = ... // see the example earlier
assert(fileScanExec.nodeNamePrefix == "File")

scala> println(fileScanExec.simpleString)
FileScan csv [id#20,name#21,city#22] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/jacek/dev/oss/datasets/people.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:string,name:string,city:string>

Performance Metrics

Key Name (in web UI) Description
filesSize size of files read
metadataTime metadata time (ms)
numFiles number of files
numOutputRows number of output rows

FileSourceScanExec in web UI (Details for Query)

Columnar Scan Metrics

The following performance metrics are available only with supportsColumnar enabled.

Key Name (in web UI) Description
scanTime scan time

Partition Scan Metrics

The following performance metrics are available only when partitions are used

Key Name (in web UI) Description
numPartitions number of partitions read
pruningTime dynamic partition pruning time

Dynamic Partition Pruning Scan Metrics

The following performance metrics are available only for isDynamicPruningFilter among the partition filters.

Key Name (in web UI) Description
staticFilesNum static number of files read
staticFilesSize static size of files read

Metadata

metadata: Map[String, String]

metadata is part of the DataSourceScanExec abstraction.

metadata...FIXME

inputRDDs

inputRDDs(): Seq[RDD[InternalRow]]

inputRDDs is part of the DataSourceScanExec abstraction.

inputRDDs is the single input RDD.

Input RDD

inputRDD: RDD[InternalRow]

lazy value

inputRDD is a Scala lazy value which is computed once when accessed and never changes afterwards.

inputRDD is an input RDD that is used when FileSourceScanExec physical operator is requested for inputRDDs and to execute.

When created, inputRDD requests HadoopFsRelation to get the underlying FileFormat that is in turn requested to build a data reader with partition column values appended (with the input parameters from the properties of HadoopFsRelation and pushedDownFilters).

In case the HadoopFsRelation has bucketing specification specified and bucketing support is enabled, inputRDD creates a FileScanRDD with bucketing (with the bucketing specification, the reader, selectedPartitions and the HadoopFsRelation itself). Otherwise, inputRDD createNonBucketedReadRDD.

Creating RDD for Non-Bucketed Read

createReadRDD(
  readFile: (PartitionedFile) => Iterator[InternalRow],
  selectedPartitions: Array[PartitionDirectory],
  fsRelation: HadoopFsRelation): RDD[InternalRow]

createReadRDD prints out the following INFO message to the logs (with maxSplitBytes hint and openCostInBytes):

Planning scan with bin packing, max size: [maxSplitBytes] bytes,
open cost is considered as scanning [openCostInBytes] bytes.

createReadRDD determines whether Bucketing is enabled or not (based on spark.sql.sources.bucketing.enabled) for bucket pruning.

Bucket Pruning

Bucket Pruning is an optimization to filter out data files from scanning (based on optionalBucketSet).

With Bucketing disabled or optionalBucketSet undefined, all files are included in scanning.

createReadRDD splits files to be scanned (in the given selectedPartitions), possibly applying bucket pruning (with Bucketing enabled). createReadRDD uses the following:

createReadRDD sorts the split files (by length in reverse order).

In the end, creates a FileScanRDD with the following:

Property Value
readFunction Input readFile function
filePartitions Partitions
readSchema requiredSchema with partitionSchema of the input HadoopFsRelation
metadataColumns metadataColumns

Dynamically Selected Partitions

dynamicallySelectedPartitions: Array[PartitionDirectory]

lazy value

dynamicallySelectedPartitions is a Scala lazy value which is computed once when accessed and cached afterwards.

dynamicallySelectedPartitions...FIXME

Selected Partitions

selectedPartitions: Seq[PartitionDirectory]

lazy value

selectedPartitions is a Scala lazy value which is computed once when accessed and cached afterwards.

selectedPartitions...FIXME

bucketedScan Flag

bucketedScan: Boolean

lazy value

selectedPartitions is a Scala lazy value which is computed once when accessed and cached afterwards.

bucketedScan...FIXME

bucketedScan is used when:

  • FIXME

Output Data Ordering Requirements

outputOrdering: Seq[SortOrder]

outputOrdering is part of the SparkPlan abstraction.

Danger

Review Me

outputOrdering is a SortOrder expression for every sort column in Ascending order only when the following all hold:

Otherwise, outputOrdering is simply empty (Nil).

Output Data Partitioning Requirements

outputPartitioning: Partitioning

outputPartitioning is part of the SparkPlan abstraction.

Danger

Review Me

outputPartitioning can be one of the following:

Fully-Qualified Class Names of ColumnVectors

vectorTypes: Option[Seq[String]]

vectorTypes is part of the SparkPlan abstraction.

Danger

Review Me

vectorTypes simply requests the FileFormat of the HadoopFsRelation for vectorTypes.

doExecuteColumnar

doExecuteColumnar(): RDD[ColumnarBatch]

doExecuteColumnar is part of the SparkPlan abstraction.

Executing Physical Operator

doExecute(): RDD[InternalRow]

doExecute is part of the SparkPlan abstraction.


Danger

Review Me

doExecute branches off per supportsBatch flag.

Note

supportsBatch flag can be enabled for ParquetFileFormat and OrcFileFormat built-in file formats (under certain conditions).

With supportsBatch flag enabled, doExecute creates a WholeStageCodegenExec physical operator (with the FileSourceScanExec as the child physical operator and codegenStageId as 0) and executes it right after.

With supportsBatch flag disabled, doExecute creates an unsafeRows RDD to scan over which is different per needsUnsafeRowConversion flag.

If needsUnsafeRowConversion flag is on, doExecute takes the input RDD and creates a new RDD by applying a function to each partition (using RDD.mapPartitionsWithIndexInternal):

  1. Creates a UnsafeProjection for the schema

  2. Initializes the UnsafeProjection

  3. Maps over the rows in a partition iterator using the UnsafeProjection projection

Otherwise, doExecute simply takes the input RDD as the unsafeRows RDD (with no changes).

doExecute takes the numOutputRows metric and creates a new RDD by mapping every element in the unsafeRows and incrementing the numOutputRows metric.

Tip

Use RDD.toDebugString to review the RDD lineage and "reverse-engineer" the values of the supportsBatch and needsUnsafeRowConversion flags given the number of RDDs.

With supportsBatch off and needsUnsafeRowConversion on you should see two more RDDs in the RDD lineage.

Creating FileScanRDD with Bucketing Support

createBucketedReadRDD(
  bucketSpec: BucketSpec,
  readFile: (PartitionedFile) => Iterator[InternalRow],
  selectedPartitions: Array[PartitionDirectory],
  fsRelation: HadoopFsRelation): RDD[InternalRow]

Danger

Review Me

createBucketedReadRDD prints the following INFO message to the logs:

Planning with [numBuckets] buckets

createBucketedReadRDD maps the available files of the input selectedPartitions into PartitionedFiles. For every file, createBucketedReadRDD getBlockLocations and getBlockHosts.

createBucketedReadRDD then groups the PartitionedFiles by bucket ID.

Note

Bucket ID is of the format _0000n, i.e. the bucket ID prefixed with up to four 0s.

createBucketedReadRDD prunes (filters out) the bucket files for the bucket IDs that are not listed in the bucket IDs for bucket pruning.

createBucketedReadRDD creates a FilePartition (file block) for every bucket ID and the (pruned) bucket PartitionedFiles.

In the end, createBucketedReadRDD creates a FileScanRDD (with the input readFile for the read function and the file blocks (FilePartitions) for every bucket ID for partitions)

Tip

Use RDD.toDebugString to see FileScanRDD in the RDD execution plan (aka RDD lineage).

// Create a bucketed table
spark.range(8).write.bucketBy(4, "id").saveAsTable("b1")

scala> sql("desc extended b1").where($"col_name" like "%Bucket%").show
+--------------+---------+-------+
|      col_name|data_type|comment|
+--------------+---------+-------+
|   Num Buckets|        4|       |
|Bucket Columns|   [`id`]|       |
+--------------+---------+-------+

val bucketedTable = spark.table("b1")

val lineage = bucketedTable.queryExecution.toRdd.toDebugString
scala> println(lineage)
(4) MapPartitionsRDD[26] at toRdd at <console>:26 []
|  FileScanRDD[25] at toRdd at <console>:26 []

createBucketedReadRDD is used when:

needsUnsafeRowConversion Flag

needsUnsafeRowConversion: Boolean

needsUnsafeRowConversion is enabled (i.e. true) when the following conditions all hold:

  1. FileFormat of the HadoopFsRelation is ParquetFileFormat

  2. spark.sql.parquet.enableVectorizedReader configuration property is enabled

Otherwise, needsUnsafeRowConversion is disabled (i.e. false).

needsUnsafeRowConversion is used when:

supportsColumnar Flag

supportsColumnar: Boolean

supportsColumnar is part of the SparkPlan abstraction.

supportsColumnar...FIXME

Demo

// Create a bucketed data source table
// It is one of the most complex examples of a LogicalRelation with a HadoopFsRelation
val tableName = "bucketed_4_id"
spark
  .range(100)
  .withColumn("part", $"id" % 2)
  .write
  .partitionBy("part")
  .bucketBy(4, "id")
  .sortBy("id")
  .mode("overwrite")
  .saveAsTable(tableName)
val q = spark.table(tableName)

val sparkPlan = q.queryExecution.executedPlan
scala> :type sparkPlan
org.apache.spark.sql.execution.SparkPlan

scala> println(sparkPlan.numberedTreeString)
00 *(1) FileScan parquet default.bucketed_4_id[id#7L,part#8L] Batched: true, Format: Parquet, Location: CatalogFileIndex[file:/Users/jacek/dev/oss/spark/spark-warehouse/bucketed_4_id], PartitionCount: 2, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 4 out of 4

import org.apache.spark.sql.execution.FileSourceScanExec
val scan = sparkPlan.collectFirst { case exec: FileSourceScanExec => exec }.get

scala> :type scan
org.apache.spark.sql.execution.FileSourceScanExec

scala> scan.metadata.toSeq.sortBy(_._1).map { case (k, v) => s"$k -> $v" }.foreach(println)
Batched -> true
Format -> Parquet
Location -> CatalogFileIndex[file:/Users/jacek/dev/oss/spark/spark-warehouse/bucketed_4_id]
PartitionCount -> 2
PartitionFilters -> []
PushedFilters -> []
ReadSchema -> struct<id:bigint>
SelectedBucketsCount -> 4 out of 4

As a DataSourceScanExec, FileSourceScanExec uses Scan for the prefix of the node name.

val fileScanExec: FileSourceScanExec = ... // see the example earlier
assert(fileScanExec.nodeName startsWith "Scan")

When executed, FileSourceScanExec operator creates a FileScanRDD (for bucketed and non-bucketed reads).

scala> :type scan
org.apache.spark.sql.execution.FileSourceScanExec

val rdd = scan.execute
scala> println(rdd.toDebugString)
(6) MapPartitionsRDD[7] at execute at <console>:28 []
 |  FileScanRDD[2] at execute at <console>:27 []

import org.apache.spark.sql.execution.datasources.FileScanRDD
assert(rdd.dependencies.head.rdd.isInstanceOf[FileScanRDD])

FileSourceScanExec supports bucket pruning so it only scans the bucket files required for a query.

scala> :type scan
org.apache.spark.sql.execution.FileSourceScanExec

import org.apache.spark.sql.execution.datasources.FileScanRDD
val rdd = scan.inputRDDs.head.asInstanceOf[FileScanRDD]

import org.apache.spark.sql.execution.datasources.FilePartition
val bucketFiles = for {
  FilePartition(bucketId, files) <- rdd.filePartitions
  f <- files
} yield s"Bucket $bucketId => $f"

scala> println(bucketFiles.size)
51

scala> bucketFiles.foreach(println)
Bucket 0 => path: file:///Users/jacek/dev/oss/spark/spark-warehouse/bucketed_4_id/part=0/part-00004-5301d371-01c3-47d4-bb6b-76c3c94f3699_00000.c000.snappy.parquet, range: 0-423, partition values: [0]
Bucket 0 => path: file:///Users/jacek/dev/oss/spark/spark-warehouse/bucketed_4_id/part=0/part-00001-5301d371-01c3-47d4-bb6b-76c3c94f3699_00000.c000.snappy.parquet, range: 0-423, partition values: [0]
...
Bucket 3 => path: file:///Users/jacek/dev/oss/spark/spark-warehouse/bucketed_4_id/part=1/part-00005-5301d371-01c3-47d4-bb6b-76c3c94f3699_00003.c000.snappy.parquet, range: 0-423, partition values: [1]
Bucket 3 => path: file:///Users/jacek/dev/oss/spark/spark-warehouse/bucketed_4_id/part=1/part-00000-5301d371-01c3-47d4-bb6b-76c3c94f3699_00003.c000.snappy.parquet, range: 0-431, partition values: [1]
Bucket 3 => path: file:///Users/jacek/dev/oss/spark/spark-warehouse/bucketed_4_id/part=1/part-00007-5301d371-01c3-47d4-bb6b-76c3c94f3699_00003.c000.snappy.parquet, range: 0-423, partition values: [1]

FileSourceScanExec uses a HashPartitioning or the default UnknownPartitioning as the output partitioning scheme.

FileSourceScanExec supports data source filters that are printed out to the console (at INFO logging level) and available as metadata (e.g. in web UI or explain).

Pushed Filters: [pushedDownFilters]

Logging

Enable ALL logging level for org.apache.spark.sql.execution.FileSourceScanExec logger to see what happens inside.

Add the following line to conf/log4j2.properties:

log4j.logger.org.apache.spark.sql.execution.FileSourceScanExec=ALL

Refer to Logging.