ContextCleaner

ContextCleaner is a Spark service that is responsible for application-wide garbage collection (cleanup) of shuffles, RDDs, broadcasts, accumulators and checkpointed RDDs that is aimed at reducing the memory requirements of long-running data-heavy Spark applications.

ContextCleaner
Figure 1. ContextCleaner and SparkContext

Creating Instance

ContextCleaner takes the following to be created:

ContextCleaner is created and requested to start when SparkContext is requested to start with spark.cleaner.referenceTracking configuration property enabled.

Spark Context Cleaner Cleaning Thread

ContextCleaner uses a daemon thread Spark Context Cleaner to clean RDD, shuffle, and broadcast states.

The Spark Context Cleaner thread is started when ContextCleaner is requested to start.

CleanerListeners

ContextCleaner allows attaching CleanerListeners to be informed when objects are cleaned using attachListener method.

attachListener(
  listener: CleanerListener): Unit

doCleanupRDD Method

doCleanupRDD(
  rddId: Int,
  blocking: Boolean): Unit

doCleanupRDD…​FIXME

doCleanupRDD is used when ContextCleaner is requested to keepCleaning for a CleanRDD.

keepCleaning Internal Method

keepCleaning(): Unit

keepCleaning runs indefinitely until ContextCleaner is requested to stop. keepCleaning…​FIXME

keepCleaning prints out the following DEBUG message to the logs:

Got cleaning task [task]

keepCleaning is used in Spark Context Cleaner Cleaning Thread that is started once when ContextCleaner is requested to start.

registerRDDCheckpointDataForCleanup Method

registerRDDCheckpointDataForCleanup[T](
  rdd: RDD[_],
  parentId: Int): Unit

registerRDDCheckpointDataForCleanup…​FIXME

registerRDDCheckpointDataForCleanup is used when ContextCleaner is requested to doCheckpoint (with spark.cleaner.referenceTracking.cleanCheckpoints configuration property enabled).

registerBroadcastForCleanup Method

registerBroadcastForCleanup[T](
  broadcast: Broadcast[T]): Unit

registerBroadcastForCleanup…​FIXME

registerBroadcastForCleanup is used when SparkContext is used to create a broadcast variable.

registerRDDForCleanup Method

registerRDDForCleanup(
  rdd: RDD[_]): Unit

registerRDDForCleanup…​FIXME

registerRDDForCleanup is used for RDD.persist operation.

registerAccumulatorForCleanup Method

registerAccumulatorForCleanup(
  a: AccumulatorV2[_, _]): Unit

registerAccumulatorForCleanup…​FIXME

registerAccumulatorForCleanup is used when AccumulatorV2 is requested to register.

Stopping ContextCleaner

stop(): Unit

stop…​FIXME

stop is used when SparkContext is requested to stop.

Starting ContextCleaner

start(): Unit

start starts the cleaning thread and an action to request the JVM garbage collector (using System.gc()) on regular basis per spark.cleaner.periodicGC.interval configuration property.

The action to request the JVM GC is scheduled on periodicGCService executor service.

start is used when SparkContext is created.

periodicGCService Single-Thread Executor Service

periodicGCService is an internal single-thread executor service with the name context-cleaner-periodic-gc to request the JVM garbage collector.

The periodic runs are started when ContextCleaner starts and stopped when ContextCleaner stops.

Registering ShuffleDependency for Cleanup

registerShuffleForCleanup(
  shuffleDependency: ShuffleDependency[_, _, _]): Unit

registerShuffleForCleanup registers the given ShuffleDependency for cleanup.

Internally, registerShuffleForCleanup simply executes registerForCleanup for the given ShuffleDependency.

registerShuffleForCleanup is used when ShuffleDependency is created.

Registering Object Reference For Cleanup

registerForCleanup(
  objectForCleanup: AnyRef,
  task: CleanupTask): Unit

registerForCleanup adds the input objectForCleanup to the referenceBuffer internal queue.

Despite the widest-possible AnyRef type of the input objectForCleanup, the type is really CleanupTaskWeakReference which is a custom Java’s java.lang.ref.WeakReference.

Shuffle Cleanup

doCleanupShuffle(
  shuffleId: Int,
  blocking: Boolean): Unit

doCleanupShuffle performs a shuffle cleanup which is to remove the shuffle from the current MapOutputTrackerMaster and BlockManagerMaster. doCleanupShuffle also notifies CleanerListeners.

Internally, when executed, doCleanupShuffle prints out the following DEBUG message to the logs:

Cleaning shuffle [id]

doCleanupShuffle uses SparkEnv to access the MapOutputTracker to unregister the given shuffle.

doCleanupShuffle uses SparkEnv to access the BlockManagerMaster to remove the shuffle blocks (for the given shuffleId).

doCleanupShuffle informs all registered CleanerListeners that shuffle was cleaned.

In the end, doCleanupShuffle prints out the following DEBUG message to the logs:

Cleaned shuffle [id]

In case of any exception, doCleanupShuffle prints out the following ERROR message to the logs and the exception itself:

Error cleaning shuffle [id]

doCleanupShuffle is used when ContextCleaner is requested to clean a shuffle reference and (interestingly) while fitting an ALSModel (in Spark MLlib).

Logging

Enable ALL logging level for org.apache.spark.ContextCleaner logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.ContextCleaner=ALL

Refer to Logging.

Internal Properties

referenceBuffer

referenceQueue