Skip to content


ContextCleaner is a Spark service that is responsible for <> (cleanup) of <>, <>, <>, <> and <> that is aimed at reducing the memory requirements of long-running data-heavy Spark applications.

ContextCleaner and SparkContext

Creating Instance

ContextCleaner takes the following to be created:

  • [[sc]][]

ContextCleaner is created and requested to start when SparkContext is created with[spark.cleaner.referenceTracking] configuration property enabled.

== [[cleaningThread]] Spark Context Cleaner Cleaning Thread

ContextCleaner uses a daemon thread Spark Context Cleaner to clean RDD, shuffle, and broadcast states.

The Spark Context Cleaner thread is started when ContextCleaner is requested to <>.

== [[listeners]][[attachListener]] CleanerListeners

ContextCleaner allows attaching[CleanerListeners] to be informed when objects are cleaned using attachListener method.


attachListener( listener: CleanerListener): Unit

== [[doCleanupRDD]] doCleanupRDD Method

[source, scala]

doCleanupRDD( rddId: Int, blocking: Boolean): Unit


doCleanupRDD is used when ContextCleaner is requested to <> for a CleanRDD.

== [[keepCleaning]] keepCleaning Internal Method

[source, scala]

keepCleaning(): Unit

keepCleaning runs indefinitely until ContextCleaner is requested to <>. keepCleaning...FIXME

keepCleaning prints out the following DEBUG message to the logs:


Got cleaning task [task]

keepCleaning is used in <> that is started once when ContextCleaner is requested to <>.

== [[registerRDDCheckpointDataForCleanup]] registerRDDCheckpointDataForCleanup Method

[source, scala]

registerRDDCheckpointDataForCleanupT: Unit


registerRDDCheckpointDataForCleanup is used when ContextCleaner is requested to <> (with[spark.cleaner.referenceTracking.cleanCheckpoints] configuration property enabled).

== [[registerBroadcastForCleanup]] registerBroadcastForCleanup Method

[source, scala]

registerBroadcastForCleanupT: Unit


registerBroadcastForCleanup is used when SparkContext is used to[create a broadcast variable].

== [[registerRDDForCleanup]] registerRDDForCleanup Method

[source, scala]

registerRDDForCleanup( rdd: RDD[_]): Unit


registerRDDForCleanup is used for[RDD.persist] operation.

== [[registerAccumulatorForCleanup]] registerAccumulatorForCleanup Method

[source, scala]

registerAccumulatorForCleanup( a: AccumulatorV2[_, _]): Unit


registerAccumulatorForCleanup is used when AccumulatorV2 is requested to register.

== [[stop]] Stopping ContextCleaner

[source, scala]

stop(): Unit


stop is used when SparkContext is requested to[stop].

== [[start]] Starting ContextCleaner

[source, scala]

start(): Unit

start starts the <> and an action to request the JVM garbage collector (using System.gc()) on regular basis per[spark.cleaner.periodicGC.interval] configuration property.

The action to request the JVM GC is scheduled on <>.

start is used when SparkContext is created.

== [[periodicGCService]] periodicGCService Single-Thread Executor Service

periodicGCService is an internal single-thread {java-javadoc-url}/java/util/concurrent/ScheduledExecutorService.html[executor service] with the name context-cleaner-periodic-gc to request the JVM garbage collector.

The periodic runs are started when <> and stopped when <>.

== [[registerShuffleForCleanup]] Registering ShuffleDependency for Cleanup

[source, scala]

registerShuffleForCleanup( shuffleDependency: ShuffleDependency[_, _, _]): Unit

registerShuffleForCleanup registers the given ShuffleDependency for cleanup.

Internally, registerShuffleForCleanup simply executes <> for the given ShuffleDependency.

registerShuffleForCleanup is used when ShuffleDependency is created.

== [[registerForCleanup]] Registering Object Reference For Cleanup

[source, scala]

registerForCleanup( objectForCleanup: AnyRef, task: CleanupTask): Unit

registerForCleanup adds the input objectForCleanup to the <> internal queue.

Despite the widest-possible AnyRef type of the input objectForCleanup, the type is really CleanupTaskWeakReference which is a custom Java's {java-javadoc-url}/java/lang/ref/WeakReference.html[java.lang.ref.WeakReference].

registerForCleanup is used when ContextCleaner is requested to <>, <>, <>, <>, and <>.

== [[doCleanupShuffle]] Shuffle Cleanup

[source, scala]

doCleanupShuffle( shuffleId: Int, blocking: Boolean): Unit

doCleanupShuffle performs a shuffle cleanup which is to remove the shuffle from the current[MapOutputTrackerMaster] and[BlockManagerMaster]. doCleanupShuffle also notifies[CleanerListeners].

Internally, when executed, doCleanupShuffle prints out the following DEBUG message to the logs:


Cleaning shuffle [id]

doCleanupShuffle uses[SparkEnv] to access the[MapOutputTracker] to[unregister the given shuffle].

doCleanupShuffle uses[SparkEnv] to access the[BlockManagerMaster] to[remove the shuffle blocks] (for the given shuffleId).

doCleanupShuffle informs all registered <> that[shuffle was cleaned].

In the end, doCleanupShuffle prints out the following DEBUG message to the logs:


Cleaned shuffle [id]

In case of any exception, doCleanupShuffle prints out the following ERROR message to the logs and the exception itself:


Error cleaning shuffle [id]

doCleanupShuffle is used when ContextCleaner is requested to <> and (interestingly) while fitting an ALSModel (in Spark MLlib).

== [[logging]] Logging

Enable ALL logging level for org.apache.spark.ContextCleaner logger to see what happens inside.

Add the following line to conf/


Refer to[Logging].

== [[internal-properties]] Internal Properties

=== [[referenceBuffer]] referenceBuffer

=== [[referenceQueue]] referenceQueue