ContextCleaner is a Spark service that is responsible for application-wide garbage collection (cleanup) of shuffles, RDDs, broadcasts, accumulators and checkpointed RDDs that is aimed at reducing the memory requirements of long-running data-heavy Spark applications.
ContextCleaner uses a daemon thread Spark Context Cleaner to clean RDD, shuffle, and broadcast states.
The Spark Context Cleaner thread is started when ContextCleaner is requested to start.
ContextCleaner allows attaching CleanerListeners to be informed when objects are cleaned using
attachListener( listener: CleanerListener): Unit
doCleanupRDD( rddId: Int, blocking: Boolean): Unit
doCleanupRDD is used when ContextCleaner is requested to keepCleaning for a CleanRDD.
keepCleaning runs indefinitely until ContextCleaner is requested to stop. keepCleaning…FIXME
keepCleaning prints out the following DEBUG message to the logs:
Got cleaning task [task]
registerBroadcastForCleanup[T]( broadcast: Broadcast[T]): Unit
registerBroadcastForCleanup is used when SparkContext is used to create a broadcast variable.
registerRDDForCleanup( rdd: RDD[_]): Unit
registerRDDForCleanup is used for RDD.persist operation.
registerAccumulatorForCleanup( a: AccumulatorV2[_, _]): Unit
registerAccumulatorForCleanup is used when AccumulatorV2 is requested to register.
stop is used when SparkContext is requested to stop.
The action to request the JVM GC is scheduled on periodicGCService executor service.
start is used when SparkContext is created.
registerShuffleForCleanup( shuffleDependency: ShuffleDependency[_, _, _]): Unit
registerShuffleForCleanup registers the given ShuffleDependency for cleanup.
Internally, registerShuffleForCleanup simply executes registerForCleanup for the given ShuffleDependency.
registerShuffleForCleanup is used when ShuffleDependency is created.
registerForCleanup( objectForCleanup: AnyRef, task: CleanupTask): Unit
registerForCleanup adds the input objectForCleanup to the referenceBuffer internal queue.
Despite the widest-possible
AnyRef type of the input
objectForCleanup, the type is really
CleanupTaskWeakReference which is a custom Java’s java.lang.ref.WeakReference.
doCleanupShuffle( shuffleId: Int, blocking: Boolean): Unit
Internally, when executed, doCleanupShuffle prints out the following DEBUG message to the logs:
Cleaning shuffle [id]
In the end, doCleanupShuffle prints out the following DEBUG message to the logs:
Cleaned shuffle [id]
In case of any exception, doCleanupShuffle prints out the following ERROR message to the logs and the exception itself:
Error cleaning shuffle [id]
doCleanupShuffle is used when ContextCleaner is requested to clean a shuffle reference and (interestingly) while fitting an
ALSModel (in Spark MLlib).
ALL logging level for
org.apache.spark.ContextCleaner logger to see what happens inside.
Add the following line to
Refer to Logging.