DataSkippingReaderBase¶
DataSkippingReaderBase
is an extension of the DeltaScanGenerator abstraction for DeltaScan generators.
The heart of DataSkippingReaderBase
(and Data Skipping in general) is the withStats DataFrame.
Contract¶
allFiles Dataset (of AddFiles)¶
allFiles: Dataset[AddFile]
Dataset
of AddFiles
See:
Used when:
DataSkippingReaderBase
is requested to withStatsInternal0, withNoStats, getAllFiles, filterOnPartitions, getSpecificFilesWithStats
DeltaLog¶
deltaLog: DeltaLog
Used when:
DataSkippingReaderBase
is requested to filesForScan
Metadata¶
metadata: Metadata
Used when:
DataSkippingReaderBase
is requested for the columnMappingMode, and to getStatsColumnOpt, filesWithStatsForScan, constructPartitionFilters, filterOnPartitions, filesForScan
numOfFilesOpt¶
numOfFilesOpt: Option[Long]
See:
Used when:
DataSkippingReaderBase
is requested to filesForScan
Path¶
path: Path
Hadoop Path
Redacted Path¶
redactedPath: String
Used when:
DataSkippingReaderBase
is requested to withStatsCache
Schema¶
schema: StructType
See:
Used when:
DataSkippingReaderBase
is requested to filesForScan
sizeInBytes¶
sizeInBytes: Long
Used when:
DataSkippingReaderBase
is requested to filesForScan
version¶
version: Long
Used when:
DataSkippingReaderBase
is requested to withStatsCache, filesForScan
Implementations¶
stats.skipping¶
DataSkippingReaderBase
uses spark.databricks.delta.stats.skipping configuration property for filesForScan.
withStats DataFrame¶
withStats: DataFrame
withStats
withStatsInternal.
Final Method
withStats
is a Scala final method and may not be overridden in subclasses.
Learn more in the Scala Language Specification.
withStats
is used when:
DataSkippingReaderBase
is requested to filesWithStatsForScan, getAllFiles, filterOnPartitions, filterOnPartitions, getDataSkippedFiles, filesForScan
withStatsInternal DataFrame¶
withStatsInternal: DataFrame
withStatsInternal
requests the withStatsCache for the DS.
withStatsCache¶
withStatsCache: CachedDS[Row]
Lazy Value
withStatsCache
is a Scala lazy value to guarantee that the code to initialize it is executed once only (when accessed for the first time) and the computed value never changes afterwards.
Learn more in the Scala Language Specification.
withStatsCache
caches the withStatsInternal0 DataFrame under the following name (with the version and the redactedPath):
Delta Table State with Stats #[version] - [redactedPath]
withStatsInternal0 DataFrame¶
withStatsInternal0: DataFrame
withStatsInternal0
is the allFiles Dataset with the statistics parsed (the stats
column decoded from JSON).
filesForScan¶
Signature
filesForScan(
limit: Long): DeltaScan
filesForScan(
limit: Long,
partitionFilters: Seq[Expression]): DeltaScan
filesForScan(
filters: Seq[Expression],
keepNumRecords: Boolean): DeltaScan
filesForScan
is part of the DeltaScanGenerator abstraction.
filesForScan
branches off based on the given filters
expressions and the schema.
If the given filters
expressions are either TrueLiteral
or empty, or the schema is empty, filesForScan
executes delta.skipping.none code path.
If there are partition-based filter expressions only (among the filters
expressions), filesForScan
executes delta.skipping.partition code path. Otherwise, filesForScan
executes delta.skipping.data code path.
No Data Skipping¶
filesForScan
...FIXME
delta.skipping.partition¶
filesForScan
...FIXME
delta.skipping.data¶
filesForScan
constructs the final partition filters with the partition filters (of the given filters
expressions).
With spark.databricks.delta.stats.skipping configuration property enabled, filesForScan
creates a file skipping predicate expression for every data filter.
filesForScan
getDataSkippedFiles for the final partition-only and data skipping filters (that leverages data skipping statistics to find the set of parquet files that need to be queried).
In the end, creates a DeltaScan (with the files and sizes, and dataSkippingOnlyV1
or dataSkippingAndPartitionFilteringV1
data skipping types).
buildSizeCollectorFilter¶
buildSizeCollectorFilter(): (ArrayAccumulator, Column => Column)
buildSizeCollectorFilter
...FIXME
verifyStatsForFilter¶
verifyStatsForFilter(
referencedStats: Set[StatsColumn]): Column
verifyStatsForFilter
...FIXME
pruneFilesByLimit¶
pruneFilesByLimit(
df: DataFrame,
limit: Long): ScanAfterLimit
pruneFilesByLimit
...FIXME
getFilesAndNumRecords¶
getFilesAndNumRecords(
df: DataFrame): Iterator[(AddFile, NumRecords)]
getFilesAndNumRecords
gets AddFiles and the number of records within each file (based on numRecords
statistic) for pruneFilesByLimit.
getFilesAndNumRecords
adds the following columns to the given DataFrame
:
Name | Expression |
---|---|
numPhysicalRecords | numRecords |
numLogicalRecords | numRecords |
stats | null string literal |
getFilesAndNumRecords
projects the DataFrame
(using DataFrame.select
) to create a DataFrame
of AddFiles and NumRecords
s (out of the numPhysicalRecords
and numLogicalRecords
columns).
Column Mapping Mode¶
columnMappingMode: DeltaColumnMappingMode
columnMappingMode
is the value of columnMapping.mode table property (in the Metadata).
getStatsColumnOpt¶
getStatsColumnOpt(
stat: StatsColumn): Option[Column] // (1)!
getStatsColumnOpt(
statType: String,
pathToColumn: Seq[String] = Nil): Option[Column]
- Uses
statType
andpathToColumn
of the givenStatsColumn
getStatsColumnOpt
resolves the given pathToColumn
to a Column
to access a requested statType
statistics.
getStatsColumnOpt
looks up the statType
in the statistics schema (by name). If not available, getStatsColumnOpt
returns None
(an undefined value) immediately.
getStatsColumnOpt
...FIXME
getStatsColumnOpt
filters out non-leaf StructType
columns as they lack statistics and skipping predicates can't use them.
Due to a JSON truncation of timestamps to milliseconds, for maxValues statistic of TimestampType
s, getStatsColumnOpt
adjusts 1 millisecond upwards (to include records that differ in microsecond precision).
getStatsColumnOpt
is used when:
DataSkippingReaderBase
is requested to getStatsColumnOrNullLiteral and getStatsColumnOptDataFiltersBuilder
is created
getStatsColumnOrNullLiteral¶
getStatsColumnOrNullLiteral(
stat: StatsColumn): Column // (1)!
getStatsColumnOrNullLiteral(
statType: String,
pathToColumn: Seq[String] = Nil): Column
- Uses
statType
andpathToColumn
of the givenStatsColumn
getStatsColumnOrNullLiteral
getStatsColumnOpt (for the statType
and pathToColumn
), if available, or falls back to lit(null)
.
getStatsColumnOrNullLiteral
is used when:
DataSkippingReaderBase
is requested to verifyStatsForFilter, buildSizeCollectorFilter
getDataSkippedFiles¶
getDataSkippedFiles(
partitionFilters: Column,
dataFilters: DataSkippingPredicate,
keepNumRecords: Boolean): (Seq[AddFile], Seq[DataSize])
getDataSkippedFiles
builds the size collectors and the filter functions:
Size Collector | Column => Column Filter Function |
---|---|
totalSize | totalFilter |
partitionSize | partitionFilter |
scanSize | scanFilter |
Size Collectors are Accumulators
The size collectors are ArrayAccumulator
s that are AccumulatorV2
s (Spark Core).
class ArrayAccumulator(val size: Int)
extends AccumulatorV2[(Int, Long), Array[Long]]
getDataSkippedFiles
takes the withStats DataFrame and adds the following WHERE
clauses (and creates a filteredFiles
dataset):
totalFilter
withLiteral.TrueLiteral
partitionFilter
with the givenpartitionFilters
scanFilter
with the givendataFilters
or a negation of verifyStatsForFilter (with the referenced statistics of thedataFilters
)
Note
At this point, getDataSkippedFiles
has built a DataFrame
that is a filtered withStats DataFrame.
With the given keepNumRecords
flag enabled, getDataSkippedFiles
adds JSON-encoded numRecords
column (based on stats.numRecords
column).
to_json(struct(col("stats.numRecords") as 'numRecords))
keepNumRecords flag is always disabled
The given keepNumRecords
flag is always off (false
) per the default value of filesForScan.
In the end, getDataSkippedFiles
converts the filtered DataFrame to AddFiles and the DataSize
s based on the following ArrayAccumulator
s:
totalSize
partitionSize
scanSize
convertDataFrameToAddFiles¶
convertDataFrameToAddFiles(
df: DataFrame): Array[AddFile]
convertDataFrameToAddFiles
converts the given DataFrame
(a Dataset[Row]
) to a Dataset[AddFile]
and executes Dataset.collect
operator.
web UI
Dataset.collect
is an action and can be tracked in web UI.
convertDataFrameToAddFiles
is used when:
DataSkippingReaderBase
is requested to getAllFiles, filterOnPartitions, getDataSkippedFiles, getSpecificFilesWithStats