ContextCleaner¶
ContextCleaner
is a Spark service that is responsible for <
Creating Instance¶
ContextCleaner takes the following to be created:
- [[sc]] SparkContext.md[]
ContextCleaner
is created and requested to start when SparkContext is created with configuration-properties.md#spark.cleaner.referenceTracking[spark.cleaner.referenceTracking] configuration property enabled.
== [[cleaningThread]] Spark Context Cleaner Cleaning Thread
ContextCleaner uses a daemon thread Spark Context Cleaner to clean RDD, shuffle, and broadcast states.
The Spark Context Cleaner thread is started when ContextCleaner is requested to <
== [[listeners]][[attachListener]] CleanerListeners
ContextCleaner allows attaching core:CleanerListener.md[CleanerListeners] to be informed when objects are cleaned using attachListener
method.
[source,scala]¶
attachListener( listener: CleanerListener): Unit
== [[doCleanupRDD]] doCleanupRDD Method
[source, scala]¶
doCleanupRDD( rddId: Int, blocking: Boolean): Unit
doCleanupRDD...FIXME
doCleanupRDD is used when ContextCleaner is requested to <
== [[keepCleaning]] keepCleaning Internal Method
[source, scala]¶
keepCleaning(): Unit¶
keepCleaning runs indefinitely until ContextCleaner is requested to <
keepCleaning prints out the following DEBUG message to the logs:
[source,plaintext]¶
Got cleaning task [task]¶
keepCleaning is used in <
== [[registerRDDCheckpointDataForCleanup]] registerRDDCheckpointDataForCleanup Method
[source, scala]¶
registerRDDCheckpointDataForCleanupT: Unit
registerRDDCheckpointDataForCleanup...FIXME
registerRDDCheckpointDataForCleanup is used when ContextCleaner is requested to <
== [[registerBroadcastForCleanup]] registerBroadcastForCleanup Method
[source, scala]¶
registerBroadcastForCleanupT: Unit
registerBroadcastForCleanup...FIXME
registerBroadcastForCleanup is used when SparkContext is used to SparkContext.md#broadcast[create a broadcast variable].
== [[registerRDDForCleanup]] registerRDDForCleanup Method
[source, scala]¶
registerRDDForCleanup( rdd: RDD[_]): Unit
registerRDDForCleanup...FIXME
registerRDDForCleanup is used for rdd:RDD.md#persist[RDD.persist] operation.
== [[registerAccumulatorForCleanup]] registerAccumulatorForCleanup Method
[source, scala]¶
registerAccumulatorForCleanup( a: AccumulatorV2[_, _]): Unit
registerAccumulatorForCleanup...FIXME
registerAccumulatorForCleanup is used when AccumulatorV2 is requested to register.
== [[stop]] Stopping ContextCleaner
[source, scala]¶
stop(): Unit¶
stop...FIXME
stop is used when SparkContext is requested to SparkContext.md#stop[stop].
== [[start]] Starting ContextCleaner
[source, scala]¶
start(): Unit¶
start starts the <System.gc()
) on regular basis per configuration-properties.md#spark.cleaner.periodicGC.interval[spark.cleaner.periodicGC.interval] configuration property.
The action to request the JVM GC is scheduled on <
start
is used when SparkContext is created.
== [[periodicGCService]] periodicGCService Single-Thread Executor Service
periodicGCService is an internal single-thread {java-javadoc-url}/java/util/concurrent/ScheduledExecutorService.html[executor service] with the name context-cleaner-periodic-gc to request the JVM garbage collector.
The periodic runs are started when <
== [[registerShuffleForCleanup]] Registering ShuffleDependency for Cleanup
[source, scala]¶
registerShuffleForCleanup( shuffleDependency: ShuffleDependency[_, _, _]): Unit
registerShuffleForCleanup registers the given ShuffleDependency for cleanup.
Internally, registerShuffleForCleanup simply executes <
registerShuffleForCleanup
is used when ShuffleDependency is created.
== [[registerForCleanup]] Registering Object Reference For Cleanup
[source, scala]¶
registerForCleanup( objectForCleanup: AnyRef, task: CleanupTask): Unit
registerForCleanup adds the input objectForCleanup to the <
Despite the widest-possible AnyRef
type of the input objectForCleanup
, the type is really CleanupTaskWeakReference
which is a custom Java's {java-javadoc-url}/java/lang/ref/WeakReference.html[java.lang.ref.WeakReference].
registerForCleanup is used when ContextCleaner is requested to <
== [[doCleanupShuffle]] Shuffle Cleanup
[source, scala]¶
doCleanupShuffle( shuffleId: Int, blocking: Boolean): Unit
doCleanupShuffle performs a shuffle cleanup which is to remove the shuffle from the current scheduler:MapOutputTrackerMaster.md[MapOutputTrackerMaster] and storage:BlockManagerMaster.md[BlockManagerMaster]. doCleanupShuffle also notifies core:CleanerListener.md[CleanerListeners].
Internally, when executed, doCleanupShuffle prints out the following DEBUG message to the logs:
[source,plaintext]¶
Cleaning shuffle [id]¶
doCleanupShuffle uses core:SparkEnv.md[SparkEnv] to access the core:SparkEnv.md#mapOutputTracker[MapOutputTracker] to scheduler:MapOutputTracker.md#unregisterShuffle[unregister the given shuffle].
doCleanupShuffle uses core:SparkEnv.md[SparkEnv] to access the core:SparkEnv.md#blockManager[BlockManagerMaster] to storage:BlockManagerMaster.md#removeShuffle[remove the shuffle blocks] (for the given shuffleId).
doCleanupShuffle informs all registered <
In the end, doCleanupShuffle prints out the following DEBUG message to the logs:
[source,plaintext]¶
Cleaned shuffle [id]¶
In case of any exception, doCleanupShuffle prints out the following ERROR message to the logs and the exception itself:
[source,plaintext]¶
Error cleaning shuffle [id]¶
doCleanupShuffle is used when ContextCleaner is requested to <ALSModel
(in Spark MLlib).
== [[logging]] Logging
Enable ALL
logging level for org.apache.spark.ContextCleaner
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
[source,plaintext]¶
log4j.logger.org.apache.spark.ContextCleaner=ALL¶
Refer to spark-logging.md[Logging].
== [[internal-properties]] Internal Properties
=== [[referenceBuffer]] referenceBuffer
=== [[referenceQueue]] referenceQueue