Skip to content

DataSkippingReaderBase

DataSkippingReaderBase is an extension of the DeltaScanGenerator abstraction for DeltaScan generators.

Contract

allFiles Dataset (of AddFiles)

allFiles: Dataset[AddFile]

Dataset of AddFiles

Used when:

DeltaLog

deltaLog: DeltaLog

DeltaLog

Used when:

Metadata

metadata: Metadata

Metadata

Used when:

numOfFiles

numOfFiles: Long

Used when:

Path

path: Path

Hadoop Path

Redacted Path

redactedPath: String

Used when:

Schema

schema: StructType

Used when:

sizeInBytes

sizeInBytes: Long

Used when:

version

version: Long

Used when:

Implementations

spark.databricks.delta.stats.skipping

DataSkippingReaderBase uses spark.databricks.delta.stats.skipping configuration property for filesForScan.

withStats DataFrame

withStats: DataFrame

withStats withStatsInternal.

withStats is used when:

withStatsInternal DataFrame

withStatsInternal: DataFrame

withStatsInternal requests the withStatsCache for the DS.

withStatsCache

withStatsCache: CachedDS[Row]
Lazy Value

withStatsCache is a Scala lazy value to guarantee that the code to initialize it is executed once only (when accessed for the first time) and the computed value never changes afterwards.

Learn more in the Scala Language Specification.

withStatsCache caches the withStatsInternal0 DataFrame under the following name (with the version and the redactedPath):

Delta Table State with Stats #[version] - [redactedPath]

withStatsInternal0 DataFrame

withStatsInternal0: DataFrame

withStatsInternal0 is the allFiles Dataset with the statistics parsed (the stats column decoded from JSON).

filesForScan

filesForScan(
  projection: Seq[Attribute],
  filters: Seq[Expression]): DeltaScan // (1)!
filesForScan(
  projection: Seq[Attribute],
  filters: Seq[Expression],
  keepNumRecords: Boolean): DeltaScan
  1. keepNumRecords flag is false

filesForScan is part of the DeltaScanGeneratorBase abstraction.

filesForScan branches off based on the given filters expressions and the schema.

If the given filters expressions are either TrueLiteral or empty, or the schema is empty, filesForScan executes delta.skipping.none code path.

If there are partition-based filter expressions only (among the filters expressions), filesForScan executes delta.skipping.partition code path. Otherwise, filesForScan executes delta.skipping.data code path.

No Data Skipping

filesForScan...FIXME

delta.skipping.partition

filesForScan...FIXME

delta.skipping.data

filesForScan constructs the final partition filters with the partition filters (of the given filters expressions).

With spark.databricks.delta.stats.skipping configuration property enabled, filesForScan creates a file skipping predicate expression for every data filter.

filesForScan getDataSkippedFiles for the final partition-only and data skipping filters (that leverages data skipping statistics to find the set of parquet files that need to be queried).

In the end, creates a DeltaScan (with the files and sizes, and dataSkippingOnlyV1 or dataSkippingAndPartitionFilteringV1 data skipping types).

getDataSkippedFiles

getDataSkippedFiles(
  partitionFilters: Column,
  dataFilters: DataSkippingPredicate,
  keepNumRecords: Boolean): (Seq[AddFile], Seq[DataSize])

getDataSkippedFiles builds the size collectors and the filter functions:

Size Collector Filter Function
totalSize totalFilter
partitionSize partitionFilter
scanSize scanFilter
Size Collectors are Accumulators

The size collectors are ArrayAccumulators that are AccumulatorV2s (Spark Core).

class ArrayAccumulator(val size: Int)
extends AccumulatorV2[(Int, Long), Array[Long]]

getDataSkippedFiles takes the withStats DataFrame adds the followingWHERE clauses (and creates a filteredFiles dataset):

  1. The above totalFilter with trueLiteral
  2. The above partitionFilter with the given partitionFilters
  3. The above scanFilter with the given dataFilters and a negation of verifyStatsForFilter

getDataSkippedFiles adds stats column that includes numRecords stats when the given keepNumRecords flag is enabled.

keepNumRecords Flag is Disabled

The given keepNumRecords flag is always off (false).

In the end, getDataSkippedFiles returns the rows (as AddFiles) and the DataSizes based on the following ArrayAccumulators:

  1. totalSize
  2. partitionSize
  3. scanSize
Dataset.collect

getDataSkippedFiles uses Dataset.collect action to collect the rows that runs the Spark SQL query and runs a Spark job.

buildSizeCollectorFilter

buildSizeCollectorFilter(): (ArrayAccumulator, Column => Column)

buildSizeCollectorFilter...FIXME

verifyStatsForFilter

verifyStatsForFilter(
  referencedStats: Set[StatsColumn]): Column

verifyStatsForFilter...FIXME

Column Mapping Mode

columnMappingMode: DeltaColumnMappingMode

columnMappingMode is the value of columnMapping.mode table property (in the Metadata).