Skip to content

CacheManager

CacheManager is a registry of structured queries that are cached and supposed to be replaced with corresponding InMemoryRelation logical operators as their cached representation (when QueryExecution is requested for a logical query plan with cached data).

Accessing CacheManager

CacheManager is shared across SparkSessions through SharedState.

val spark: SparkSession = ...
spark.sharedState.cacheManager

Dataset.cache and persist Operators

A structured query (as Dataset) can be cached and registered with CacheManager using Dataset.cache or Dataset.persist high-level operators.

Cached Queries

cachedData: LinkedList[CachedData]

CacheManager uses the cachedData internal registry to manage cached structured queries as CachedData with InMemoryRelation leaf logical operators.

A new CachedData is added when CacheManager is requested to:

A CachedData is removed when CacheManager is requested to:

All CachedData are removed (cleared) when CacheManager is requested to clearCache

Re-Caching By Path

recacheByPath(
  spark: SparkSession,
  resourcePath: String): Unit
recacheByPath(
  spark: SparkSession,
  resourcePath: Path,
  fs: FileSystem): Unit

recacheByPath...FIXME

recacheByPath is used when:

lookupAndRefresh

lookupAndRefresh(
  plan: LogicalPlan,
  fs: FileSystem,
  qualifiedPath: Path): Boolean

lookupAndRefresh...FIXME

refreshFileIndexIfNecessary

refreshFileIndexIfNecessary(
  fileIndex: FileIndex,
  fs: FileSystem,
  qualifiedPath: Path): Boolean

refreshFileIndexIfNecessary...FIXME

refreshFileIndexIfNecessary is used when CacheManager is requested to lookupAndRefresh.

Looking Up CachedData

lookupCachedData(
  query: Dataset[_]): Option[CachedData]
lookupCachedData(
  plan: LogicalPlan): Option[CachedData]

lookupCachedData...FIXME

lookupCachedData is used when:

Un-caching Dataset

uncacheQuery(
  query: Dataset[_],
  cascade: Boolean,
  blocking: Boolean = true): Unit
uncacheQuery(
  spark: SparkSession,
  plan: LogicalPlan,
  cascade: Boolean,
  blocking: Boolean): Unit

uncacheQuery...FIXME

uncacheQuery is used when:

Caching Query

cacheQuery(
  query: Dataset[_],
  tableName: Option[String] = None,
  storageLevel: StorageLevel = MEMORY_AND_DISK): Unit

cacheQuery adds the analyzed logical plan of the input Dataset to the cachedData internal registry of cached queries.

Internally, cacheQuery requests the Dataset for the analyzed logical plan and creates a InMemoryRelation with the following:

cacheQuery then creates a CachedData (for the analyzed query plan and the InMemoryRelation) and adds it to the cachedData internal registry.

If the input query has already been cached, cacheQuery simply prints out the following WARN message to the logs and exits (i.e. does nothing but prints out the WARN message):

Asked to cache already cached data.

cacheQuery is used when:

Clearing Cache

clearCache(): Unit

clearCache takes every CachedData from the cachedData internal registry and requests it for the InMemoryRelation to access the CachedRDDBuilder. clearCache requests the CachedRDDBuilder to clearCache.

In the end, clearCache removes all CachedData entries from the cachedData internal registry.

clearCache is used when CatalogImpl is requested to clear the cache.

Re-Caching Query

recacheByCondition(
  spark: SparkSession,
  condition: LogicalPlan => Boolean): Unit

recacheByCondition...FIXME

recacheByCondition is used when CacheManager is requested to uncache a structured query, recacheByPlan, and recacheByPath.

Re-Caching By Logical Plan

recacheByPlan(
  spark: SparkSession,
  plan: LogicalPlan): Unit

recacheByPlan...FIXME

recacheByPlan is used when InsertIntoDataSourceCommand logical command is executed.

Replacing Segments of Logical Query Plan With Cached Data

useCachedData(
  plan: LogicalPlan): LogicalPlan

useCachedData traverses the given logical query plan down (parent operators first, children later) and replaces them with cached representation (i.e. InMemoryRelation) if found. useCachedData does this operator substitution for SubqueryExpression expressions, too.

useCachedData skips IgnoreCachedData commands (and leaves them unchanged).

useCachedData is used (recursively) when QueryExecution is requested for a logical query plan with cached data.

Logging

Enable ALL logging level for org.apache.spark.sql.execution.CacheManager logger to see what happens inside.

Add the following line to conf/log4j2.properties:

log4j.logger.org.apache.spark.sql.execution.CacheManager=ALL

Refer to Logging.