Skip to content

DataSkippingReaderBase

DataSkippingReaderBase is an extension of the DeltaScanGenerator abstraction for DeltaScan generators.

The heart of DataSkippingReaderBase (and Data Skipping in general) is the withStats DataFrame.

Contract

allFiles Dataset (of AddFiles)

allFiles: Dataset[AddFile]

Dataset of AddFiles

See:

Used when:

DeltaLog

deltaLog: DeltaLog

DeltaLog

Used when:

Metadata

metadata: Metadata

Metadata

Used when:

numOfFilesOpt

numOfFilesOpt: Option[Long]

See:

Used when:

Path

path: Path

Hadoop Path

Redacted Path

redactedPath: String

Used when:

Schema

schema: StructType

See:

Used when:

sizeInBytes

sizeInBytes: Long

Used when:

version

version: Long

Used when:

Implementations

stats.skipping

DataSkippingReaderBase uses spark.databricks.delta.stats.skipping configuration property for filesForScan.

withStats DataFrame

withStats: DataFrame

withStats withStatsInternal.

Final Method

withStats is a Scala final method and may not be overridden in subclasses.

Learn more in the Scala Language Specification.


withStats is used when:

withStatsInternal DataFrame

withStatsInternal: DataFrame

withStatsInternal requests the withStatsCache for the DS.

withStatsCache

withStatsCache: CachedDS[Row]
Lazy Value

withStatsCache is a Scala lazy value to guarantee that the code to initialize it is executed once only (when accessed for the first time) and the computed value never changes afterwards.

Learn more in the Scala Language Specification.

withStatsCache caches the withStatsInternal0 DataFrame under the following name (with the version and the redactedPath):

Delta Table State with Stats #[version] - [redactedPath]

withStatsInternal0 DataFrame

withStatsInternal0: DataFrame

withStatsInternal0 is the allFiles Dataset with the statistics parsed (the stats column decoded from JSON).

filesForScan

Signature
filesForScan(
  limit: Long): DeltaScan
filesForScan(
  limit: Long,
  partitionFilters: Seq[Expression]): DeltaScan
filesForScan(
  filters: Seq[Expression],
  keepNumRecords: Boolean): DeltaScan

filesForScan is part of the DeltaScanGenerator abstraction.

filesForScan branches off based on the given filters expressions and the schema.

If the given filters expressions are either TrueLiteral or empty, or the schema is empty, filesForScan executes delta.skipping.none code path.

If there are partition-based filter expressions only (among the filters expressions), filesForScan executes delta.skipping.partition code path. Otherwise, filesForScan executes delta.skipping.data code path.

No Data Skipping

filesForScan...FIXME

delta.skipping.partition

filesForScan...FIXME

delta.skipping.data

filesForScan constructs the final partition filters with the partition filters (of the given filters expressions).

With spark.databricks.delta.stats.skipping configuration property enabled, filesForScan creates a file skipping predicate expression for every data filter.

filesForScan getDataSkippedFiles for the final partition-only and data skipping filters (that leverages data skipping statistics to find the set of parquet files that need to be queried).

In the end, creates a DeltaScan (with the files and sizes, and dataSkippingOnlyV1 or dataSkippingAndPartitionFilteringV1 data skipping types).

buildSizeCollectorFilter

buildSizeCollectorFilter(): (ArrayAccumulator, Column => Column)

buildSizeCollectorFilter...FIXME

verifyStatsForFilter

verifyStatsForFilter(
  referencedStats: Set[StatsColumn]): Column

verifyStatsForFilter...FIXME

pruneFilesByLimit

pruneFilesByLimit(
  df: DataFrame,
  limit: Long): ScanAfterLimit

pruneFilesByLimit...FIXME

getFilesAndNumRecords

getFilesAndNumRecords(
  df: DataFrame): Iterator[(AddFile, NumRecords)]

getFilesAndNumRecords gets AddFiles and the number of records within each file (based on numRecords statistic) for pruneFilesByLimit.


getFilesAndNumRecords adds the following columns to the given DataFrame:

Name Expression
numPhysicalRecords numRecords
numLogicalRecords numRecords
stats null string literal

getFilesAndNumRecords projects the DataFrame (using DataFrame.select) to create a DataFrame of AddFiles and NumRecordss (out of the numPhysicalRecords and numLogicalRecords columns).

Column Mapping Mode

columnMappingMode: DeltaColumnMappingMode

columnMappingMode is the value of columnMapping.mode table property (in the Metadata).

getStatsColumnOpt

getStatsColumnOpt(
  stat: StatsColumn): Option[Column] // (1)!
getStatsColumnOpt(
  statType: String,
  pathToColumn: Seq[String] = Nil): Option[Column]
  1. Uses statType and pathToColumn of the given StatsColumn

getStatsColumnOpt resolves the given pathToColumn to a Column to access a requested statType statistics.


getStatsColumnOpt looks up the statType in the statistics schema (by name). If not available, getStatsColumnOpt returns None (an undefined value) immediately.

getStatsColumnOpt...FIXME

getStatsColumnOpt filters out non-leaf StructType columns as they lack statistics and skipping predicates can't use them.

Due to a JSON truncation of timestamps to milliseconds, for maxValues statistic of TimestampTypes, getStatsColumnOpt adjusts 1 millisecond upwards (to include records that differ in microsecond precision).


getStatsColumnOpt is used when:

getStatsColumnOrNullLiteral

getStatsColumnOrNullLiteral(
  stat: StatsColumn): Column // (1)!
getStatsColumnOrNullLiteral(
  statType: String,
  pathToColumn: Seq[String] = Nil): Column
  1. Uses statType and pathToColumn of the given StatsColumn

getStatsColumnOrNullLiteral getStatsColumnOpt (for the statType and pathToColumn), if available, or falls back to lit(null).


getStatsColumnOrNullLiteral is used when:

getDataSkippedFiles

getDataSkippedFiles(
  partitionFilters: Column,
  dataFilters: DataSkippingPredicate,
  keepNumRecords: Boolean): (Seq[AddFile], Seq[DataSize])

getDataSkippedFiles builds the size collectors and the filter functions:

Size Collector Column => Column Filter Function
totalSize totalFilter
partitionSize partitionFilter
scanSize scanFilter
Size Collectors are Accumulators

The size collectors are ArrayAccumulators that are AccumulatorV2s (Spark Core).

class ArrayAccumulator(val size: Int)
extends AccumulatorV2[(Int, Long), Array[Long]]

getDataSkippedFiles takes the withStats DataFrame and adds the following WHERE clauses (and creates a filteredFiles dataset):

  1. totalFilter with Literal.TrueLiteral
  2. partitionFilter with the given partitionFilters
  3. scanFilter with the given dataFilters or a negation of verifyStatsForFilter (with the referenced statistics of the dataFilters)

Note

At this point, getDataSkippedFiles has built a DataFrame that is a filtered withStats DataFrame.

With the given keepNumRecords flag enabled, getDataSkippedFiles adds JSON-encoded numRecords column (based on stats.numRecords column).

to_json(struct(col("stats.numRecords") as 'numRecords))
keepNumRecords flag is always disabled

The given keepNumRecords flag is always off (false) per the default value of filesForScan.

In the end, getDataSkippedFiles converts the filtered DataFrame to AddFiles and the DataSizes based on the following ArrayAccumulators:

  1. totalSize
  2. partitionSize
  3. scanSize

convertDataFrameToAddFiles

convertDataFrameToAddFiles(
  df: DataFrame): Array[AddFile]

convertDataFrameToAddFiles converts the given DataFrame (a Dataset[Row]) to a Dataset[AddFile] and executes Dataset.collect operator.

web UI

Dataset.collect is an action and can be tracked in web UI.


convertDataFrameToAddFiles is used when: