CacheManager¶
CacheManager is a registry of logical query plans that are cached and supposed to be replaced with corresponding InMemoryRelation logical operators as their cached representation (when QueryExecution is requested for a logical query plan with cached data).
Accessing CacheManager¶
CacheManager is shared across SparkSessions through SharedState.
spark.sharedState.cacheManager
Dataset.cache and persist Operators¶
A structured query (as Dataset) can be cached and registered with CacheManager using Dataset.cache or Dataset.persist high-level operators.
Cached Queries¶
cachedData: LinkedList[CachedData]
CacheManager uses the cachedData internal registry to manage cached structured queries as CachedData with InMemoryRelation leaf logical operators.
A new CachedData is added when CacheManager is requested to:
A CachedData is removed when CacheManager is requested to:
All CachedData are removed (cleared) when CacheManager is requested to clearCache
Re-Caching By Path¶
recacheByPath(
spark: SparkSession,
resourcePath: String): Unit
recacheByPath(
spark: SparkSession,
resourcePath: Path,
fs: FileSystem): Unit
recacheByPath...FIXME
recacheByPath is used when:
CatalogImplis requested to refreshByPath- InsertIntoHadoopFsRelationCommand command is executed
lookupAndRefresh¶
lookupAndRefresh(
plan: LogicalPlan,
fs: FileSystem,
qualifiedPath: Path): Boolean
lookupAndRefresh...FIXME
refreshFileIndexIfNecessary¶
refreshFileIndexIfNecessary(
fileIndex: FileIndex,
fs: FileSystem,
qualifiedPath: Path): Boolean
refreshFileIndexIfNecessary...FIXME
Looking Up CachedData¶
lookupCachedData(
query: Dataset[_]): Option[CachedData]
lookupCachedData(
plan: LogicalPlan): Option[CachedData]
lookupCachedData...FIXME
lookupCachedData is used when:
- Dataset.storageLevel action is used
CatalogImplis requested to isCachedCacheManageris requested to cacheQuery and useCachedData
Un-caching Dataset¶
uncacheQuery(
query: Dataset[_],
cascade: Boolean,
blocking: Boolean = true): Unit
uncacheQuery(
spark: SparkSession,
plan: LogicalPlan,
cascade: Boolean,
blocking: Boolean): Unit
uncacheQuery...FIXME
uncacheQuery is used when:
- Dataset.unpersist basic action is used
DropTableCommandand TruncateTableCommand logical commands are executedCatalogImplis requested to uncache and refresh a table or view, dropTempView and dropGlobalTempView
Caching Query¶
cacheQuery(
query: Dataset[_],
tableName: Option[String] = None,
storageLevel: StorageLevel = MEMORY_AND_DISK): Unit
cacheQuery adds the analyzed logical plan of the input Dataset to the cachedData internal registry of cached queries.
Internally, cacheQuery requests the Dataset for the analyzed logical plan and creates a InMemoryRelation with the following:
- spark.sql.inMemoryColumnarStorage.compressed configuration property
- spark.sql.inMemoryColumnarStorage.batchSize configuration property
- Input
storageLevelstorage level - Optimized physical query plan (after requesting
SessionStateto execute the analyzed logical plan) - Input
tableName - Statistics of the analyzed query plan
cacheQuery then creates a CachedData (for the analyzed query plan and the InMemoryRelation) and adds it to the cachedData internal registry.
If the input query has already been cached, cacheQuery simply prints out the following WARN message to the logs and exits (i.e. does nothing but prints out the WARN message):
Asked to cache already cached data.
cacheQuery is used when:
- Dataset.persist basic action is used
CatalogImplis requested to cache and refresh a table or view in-memory
Clearing Cache¶
clearCache(): Unit
clearCache takes every CachedData from the cachedData internal registry and requests it for the InMemoryRelation to access the CachedRDDBuilder. clearCache requests the CachedRDDBuilder to clearCache.
In the end, clearCache removes all CachedData entries from the cachedData internal registry.
clearCache is used when:
CatalogImplis requested to clear the cache
Re-Caching Query¶
recacheByCondition(
spark: SparkSession,
condition: LogicalPlan => Boolean): Unit
recacheByCondition...FIXME
recacheByCondition is used when:
CacheManageris requested to uncache a structured query, recacheByPlan, and recacheByPath
Re-Caching By Logical Plan¶
recacheByPlan(
spark: SparkSession,
plan: LogicalPlan): Unit
recacheByPlan...FIXME
recacheByPlan is used when:
- InsertIntoDataSourceCommand logical command is executed
Replacing Segments of Logical Query Plan With Cached Data¶
useCachedData(
plan: LogicalPlan): LogicalPlan
useCachedData traverses the given logical query plan down (parent operators first, children later) and replaces them with cached representation (i.e. InMemoryRelation) if found. useCachedData does this operator substitution for SubqueryExpression expressions, too.
useCachedData skips IgnoreCachedData commands (and leaves them unchanged).
useCachedData is used (recursively) when:
QueryExecutionis requested for a logical query plan with cached data
Logging¶
Enable ALL logging level for org.apache.spark.sql.execution.CacheManager logger to see what happens inside.
Add the following line to conf/log4j2.properties:
log4j.logger.org.apache.spark.sql.execution.CacheManager=ALL
Refer to Logging.