DataSkippingReaderBase¶
DataSkippingReaderBase is an extension of the DeltaScanGenerator abstraction for DeltaScan generators.
The heart of DataSkippingReaderBase (and Data Skipping in general) is the withStats DataFrame.
Contract¶
allFiles Dataset (of AddFiles)¶
allFiles: Dataset[AddFile]
Dataset of AddFiles
See:
Used when:
DataSkippingReaderBaseis requested to withStatsInternal0, withNoStats, getAllFiles, filterOnPartitions, getSpecificFilesWithStats
DeltaLog¶
deltaLog: DeltaLog
Used when:
DataSkippingReaderBaseis requested to filesForScan
Metadata¶
metadata: Metadata
Used when:
DataSkippingReaderBaseis requested for the columnMappingMode, and to getStatsColumnOpt, filesWithStatsForScan, constructPartitionFilters, filterOnPartitions, filesForScan
numOfFilesOpt¶
numOfFilesOpt: Option[Long]
See:
Used when:
DataSkippingReaderBaseis requested to filesForScan
Path¶
path: Path
Hadoop Path
Redacted Path¶
redactedPath: String
Used when:
DataSkippingReaderBaseis requested to withStatsCache
Schema¶
schema: StructType
See:
Used when:
DataSkippingReaderBaseis requested to filesForScan
sizeInBytes¶
sizeInBytes: Long
Used when:
DataSkippingReaderBaseis requested to filesForScan
version¶
version: Long
Used when:
DataSkippingReaderBaseis requested to withStatsCache, filesForScan
Implementations¶
stats.skipping¶
DataSkippingReaderBase uses spark.databricks.delta.stats.skipping configuration property for filesForScan.
withStats DataFrame¶
withStats: DataFrame
withStats withStatsInternal.
Final Method
withStats is a Scala final method and may not be overridden in subclasses.
Learn more in the Scala Language Specification.
withStats is used when:
DataSkippingReaderBaseis requested to filesWithStatsForScan, getAllFiles, filterOnPartitions, filterOnPartitions, getDataSkippedFiles, filesForScan
withStatsInternal DataFrame¶
withStatsInternal: DataFrame
withStatsInternal requests the withStatsCache for the DS.
withStatsCache¶
withStatsCache: CachedDS[Row]
Lazy Value
withStatsCache is a Scala lazy value to guarantee that the code to initialize it is executed once only (when accessed for the first time) and the computed value never changes afterwards.
Learn more in the Scala Language Specification.
withStatsCache caches the withStatsInternal0 DataFrame under the following name (with the version and the redactedPath):
Delta Table State with Stats #[version] - [redactedPath]
withStatsInternal0 DataFrame¶
withStatsInternal0: DataFrame
withStatsInternal0 is the allFiles Dataset with the statistics parsed (the stats column decoded from JSON).
filesForScan¶
Signature
filesForScan(
limit: Long): DeltaScan
filesForScan(
limit: Long,
partitionFilters: Seq[Expression]): DeltaScan
filesForScan(
filters: Seq[Expression],
keepNumRecords: Boolean): DeltaScan
filesForScan is part of the DeltaScanGenerator abstraction.
filesForScan branches off based on the given filters expressions and the schema.
If the given filters expressions are either TrueLiteral or empty, or the schema is empty, filesForScan executes delta.skipping.none code path.
If there are partition-based filter expressions only (among the filters expressions), filesForScan executes delta.skipping.partition code path. Otherwise, filesForScan executes delta.skipping.data code path.
No Data Skipping¶
filesForScan...FIXME
delta.skipping.partition¶
filesForScan...FIXME
delta.skipping.data¶
filesForScan constructs the final partition filters with the partition filters (of the given filters expressions).
With spark.databricks.delta.stats.skipping configuration property enabled, filesForScan creates a file skipping predicate expression for every data filter.
filesForScan getDataSkippedFiles for the final partition-only and data skipping filters (that leverages data skipping statistics to find the set of parquet files that need to be queried).
In the end, creates a DeltaScan (with the files and sizes, and dataSkippingOnlyV1 or dataSkippingAndPartitionFilteringV1 data skipping types).
buildSizeCollectorFilter¶
buildSizeCollectorFilter(): (ArrayAccumulator, Column => Column)
buildSizeCollectorFilter...FIXME
verifyStatsForFilter¶
verifyStatsForFilter(
referencedStats: Set[StatsColumn]): Column
verifyStatsForFilter...FIXME
pruneFilesByLimit¶
pruneFilesByLimit(
df: DataFrame,
limit: Long): ScanAfterLimit
pruneFilesByLimit...FIXME
getFilesAndNumRecords¶
getFilesAndNumRecords(
df: DataFrame): Iterator[(AddFile, NumRecords)]
getFilesAndNumRecords gets AddFiles and the number of records within each file (based on numRecords statistic) for pruneFilesByLimit.
getFilesAndNumRecords adds the following columns to the given DataFrame:
| Name | Expression |
|---|---|
numPhysicalRecords | numRecords |
numLogicalRecords | numRecords |
stats | null string literal |
getFilesAndNumRecords projects the DataFrame (using DataFrame.select) to create a DataFrame of AddFiles and NumRecordss (out of the numPhysicalRecords and numLogicalRecords columns).
Column Mapping Mode¶
columnMappingMode: DeltaColumnMappingMode
columnMappingMode is the value of columnMapping.mode table property (in the Metadata).
getStatsColumnOpt¶
getStatsColumnOpt(
stat: StatsColumn): Option[Column] // (1)!
getStatsColumnOpt(
statType: String,
pathToColumn: Seq[String] = Nil): Option[Column]
- Uses
statTypeandpathToColumnof the givenStatsColumn
getStatsColumnOpt resolves the given pathToColumn to a Column to access a requested statType statistics.
getStatsColumnOpt looks up the statType in the statistics schema (by name). If not available, getStatsColumnOpt returns None (an undefined value) immediately.
getStatsColumnOpt...FIXME
getStatsColumnOpt filters out non-leaf StructType columns as they lack statistics and skipping predicates can't use them.
Due to a JSON truncation of timestamps to milliseconds, for maxValues statistic of TimestampTypes, getStatsColumnOpt adjusts 1 millisecond upwards (to include records that differ in microsecond precision).
getStatsColumnOpt is used when:
DataSkippingReaderBaseis requested to getStatsColumnOrNullLiteral and getStatsColumnOptDataFiltersBuilderis created
getStatsColumnOrNullLiteral¶
getStatsColumnOrNullLiteral(
stat: StatsColumn): Column // (1)!
getStatsColumnOrNullLiteral(
statType: String,
pathToColumn: Seq[String] = Nil): Column
- Uses
statTypeandpathToColumnof the givenStatsColumn
getStatsColumnOrNullLiteral getStatsColumnOpt (for the statType and pathToColumn), if available, or falls back to lit(null).
getStatsColumnOrNullLiteral is used when:
DataSkippingReaderBaseis requested to verifyStatsForFilter, buildSizeCollectorFilter
getDataSkippedFiles¶
getDataSkippedFiles(
partitionFilters: Column,
dataFilters: DataSkippingPredicate,
keepNumRecords: Boolean): (Seq[AddFile], Seq[DataSize])
getDataSkippedFiles builds the size collectors and the filter functions:
| Size Collector | Column => Column Filter Function |
|---|---|
| totalSize | totalFilter |
| partitionSize | partitionFilter |
| scanSize | scanFilter |
Size Collectors are Accumulators
The size collectors are ArrayAccumulators that are AccumulatorV2s (Spark Core).
class ArrayAccumulator(val size: Int)
extends AccumulatorV2[(Int, Long), Array[Long]]
getDataSkippedFiles takes the withStats DataFrame and adds the following WHERE clauses (and creates a filteredFiles dataset):
totalFilterwithLiteral.TrueLiteralpartitionFilterwith the givenpartitionFiltersscanFilterwith the givendataFiltersor a negation of verifyStatsForFilter (with the referenced statistics of thedataFilters)
Note
At this point, getDataSkippedFiles has built a DataFrame that is a filtered withStats DataFrame.
With the given keepNumRecords flag enabled, getDataSkippedFiles adds JSON-encoded numRecords column (based on stats.numRecords column).
to_json(struct(col("stats.numRecords") as 'numRecords))
keepNumRecords flag is always disabled
The given keepNumRecords flag is always off (false) per the default value of filesForScan.
In the end, getDataSkippedFiles converts the filtered DataFrame to AddFiles and the DataSizes based on the following ArrayAccumulators:
totalSizepartitionSizescanSize
convertDataFrameToAddFiles¶
convertDataFrameToAddFiles(
df: DataFrame): Array[AddFile]
convertDataFrameToAddFiles converts the given DataFrame (a Dataset[Row]) to a Dataset[AddFile] and executes Dataset.collect operator.
web UI
Dataset.collect is an action and can be tracked in web UI.
convertDataFrameToAddFiles is used when:
DataSkippingReaderBaseis requested to getAllFiles, filterOnPartitions, getDataSkippedFiles, getSpecificFilesWithStats