TaskContext¶
TaskContext
is the <
-
Hold contextual information about a scheduler:Task.md[task] at execution, e.g. <
>, < >, < >, < > -
Give access to the lifecycle of a task, e.g. <
>, < >
A task can access the TaskContext instance using <null
unless executed within the execution thread of a task).
[source, scala]¶
import org.apache.spark.TaskContext val ctx = TaskContext.get
TaskContext allows for <
[[contract]] .TaskContext Contract [cols="1m,3",options="header",width="100%"] |=== | Method | Description
| addTaskCompletionListener a| [[addTaskCompletionListener]]
[source, scala]¶
addTaskCompletionListener( listener: TaskCompletionListener): TaskContext // a concrete method for a Scala-friendly usage addTaskCompletionListenerU: TaskContext
Registers a TaskCompletionListener
Used when...FIXME
| addTaskFailureListener a| [[addTaskFailureListener]]
[source, scala]¶
addTaskFailureListener(listener: TaskFailureListener): TaskContext // a concrete method for a Scala-friendly usage addTaskFailureListener(f: (TaskContext, Throwable) => Unit): TaskContext
Registers a TaskFailureListener
Used when...FIXME
| attemptNumber a| [[attemptNumber]]
[source, scala]¶
attemptNumber(): Int¶
Specifies how many times the task has been attempted to execute (starting from 0
)
Used when...FIXME
| fetchFailed a| [[fetchFailed]]
[source, scala]¶
fetchFailed: Option[FetchFailedException]¶
Used when...FIXME
| getKillReason a| [[getKillReason]]
[source, scala]¶
getKillReason(): Option[String]¶
Used when...FIXME
| getLocalProperties a| [[getLocalProperties]]
[source, scala]¶
getLocalProperties: Properties¶
Used when...FIXME
| getLocalProperty a| [[getLocalProperty]]
[source, scala]¶
getLocalProperty(key: String): String¶
Used when...FIXME
| getMetricsSources a| [[getMetricsSources]]
[source, scala]¶
getMetricsSources(sourceName: String): Seq[Source]¶
<sourceName
which are associated with the instance that runs the task.
Used when...FIXME
| isCompleted a| [[isCompleted]]
[source, scala]¶
isCompleted(): Boolean¶
Used when...FIXME
| isInterrupted a| [[isInterrupted]]
[source, scala]¶
isInterrupted(): Boolean¶
Used when...FIXME
| isRunningLocally a| [[isRunningLocally]]
[source, scala]¶
isRunningLocally(): Boolean¶
Used when...FIXME
| killTaskIfInterrupted a| [[killTaskIfInterrupted]]
[source, scala]¶
killTaskIfInterrupted(): Unit¶
Used when...FIXME
| markInterrupted a| [[markInterrupted]]
[source, scala]¶
markInterrupted(reason: String): Unit¶
Used when...FIXME
| markTaskCompleted a| [[markTaskCompleted]]
[source, scala]¶
markTaskCompleted(error: Option[Throwable]): Unit¶
Used when...FIXME
| markTaskFailed a| [[markTaskFailed]]
[source, scala]¶
markTaskFailed(error: Throwable): Unit¶
Used when...FIXME
| partitionId a| [[partitionId]]
[source, scala]¶
partitionId(): Int¶
ID of the Partition computed by the task
Used when...FIXME
| registerAccumulator a| [[registerAccumulator]]
[source, scala]¶
registerAccumulator(a: AccumulatorV2[_, _]): Unit¶
Used when...FIXME
| setFetchFailed a| [[setFetchFailed]]
[source, scala]¶
setFetchFailed(fetchFailed: FetchFailedException): Unit¶
Used when...FIXME
| stageAttemptNumber a| [[stageAttemptNumber]]
[source, scala]¶
stageAttemptNumber(): Int¶
Used when...FIXME
| stageId a| [[stageId]]
[source, scala]¶
stageId(): Int¶
ID of the scheduler:Stage.md[Stage] the task belongs to
Used when...FIXME
| taskAttemptId a| [[taskAttemptId]]
[source, scala]¶
taskAttemptId(): Long¶
Task (execution) attempt ID
Used when...FIXME
| taskMemoryManager a| [[taskMemoryManager]]
[source, scala]¶
taskMemoryManager(): TaskMemoryManager¶
memory:TaskMemoryManager.md[TaskMemoryManager]
Used when...FIXME
| taskMetrics a| [[taskMetrics]]
[source, scala]¶
taskMetrics(): TaskMetrics¶
executor:TaskMetrics.md[]
Used when...FIXME
|===
[[implementations]] .TaskContexts [cols="1,3",options="header",width="100%"] |=== | TaskContext | Description
| BarrierTaskContext | [[BarrierTaskContext]]
| TaskContextImpl | [[TaskContextImpl]]
|===
== [[setTaskContext]] Setting Thread-Local TaskContext -- setTaskContext
Object Method
[source, scala]¶
setTaskContext(tc: TaskContext): Unit¶
setTaskContext
binds the given TaskContext as a thread-local variable.
[NOTE]¶
setTaskContext
is used when:
Task
is requested to scheduler:Task.md#run[run] (whenExecutor
is requested to executor:Executor.md#launchTask[launch a task (on "Executor task launch worker" thread pool) sometime in the future])
* other cases of less importance¶
== [[get]] Accessing Active TaskContext -- get
Object Method
[source, scala]¶
get(): TaskContext¶
get
returns the thread-local TaskContext instance (by requesting the taskContext
thread-local variable to get the instance).
NOTE: get
is a method of TaskContext object in Scala and so it is just one instance available (per classloader). With the https://docs.oracle.com/javase/8/docs/api/java/lang/ThreadLocal.html[ThreadLocal] variable (ThreadLocal[TaskContext]
), the TaskContext instance is thread-local and so allows for associating state with the thread of a task.
[source, scala]¶
val rdd = sc.range(0, 3, numSlices = 3)
assert(rdd.partitions.size == 3)
rdd.foreach { n => import org.apache.spark.TaskContext val tc = TaskContext.get val msg = s"""|------------------- |partitionId: ${tc.partitionId} |stageId: ${tc.stageId} |attemptNum: ${tc.attemptNumber} |taskAttemptId: ${tc.taskAttemptId} |-------------------""".stripMargin println(msg) }
== [[registering-task-listeners]] Registering Task Listeners
Using TaskContext object you can register task listeners for <
=== [[addTaskCompletionListener]] addTaskCompletionListener
Method
[source, scala]¶
addTaskCompletionListener(listener: TaskCompletionListener): TaskContext addTaskCompletionListener(f: (TaskContext) => Unit): TaskContext
addTaskCompletionListener
methods register a TaskCompletionListener
listener to be executed on task completion.
NOTE: It will be executed regardless of the final state of a task - success, failure, or cancellation.
[source, scala]¶
val rdd = sc.range(0, 5, numSlices = 1)
import org.apache.spark.TaskContext val printTaskInfo = (tc: TaskContext) => { val msg = s"""|------------------- |partitionId: ${tc.partitionId} |stageId: ${tc.stageId} |attemptNum: ${tc.attemptNumber} |taskAttemptId: ${tc.taskAttemptId} |-------------------""".stripMargin println(msg) }
rdd.foreachPartition { _ => val tc = TaskContext.get tc.addTaskCompletionListener(printTaskInfo) }
=== [[addTaskFailureListener]] addTaskFailureListener
Method
[source, scala]¶
addTaskFailureListener(listener: TaskFailureListener): TaskContext addTaskFailureListener(f: (TaskContext, Throwable) => Unit): TaskContext
addTaskFailureListener
methods register a TaskFailureListener
listener to be executed on task failure only. It can be executed multiple times since a task can be re-attempted when it fails.
[source, scala]¶
val rdd = sc.range(0, 2, numSlices = 2)
import org.apache.spark.TaskContext val printTaskErrorInfo = (tc: TaskContext, error: Throwable) => { val msg = s"""|------------------- |partitionId: ${tc.partitionId} |stageId: ${tc.stageId} |attemptNum: ${tc.attemptNumber} |taskAttemptId: ${tc.taskAttemptId} |error: ${error.toString} |-------------------""".stripMargin println(msg) }
val throwExceptionForOddNumber = (n: Long) => { if (n % 2 == 1) { throw new Exception(s"No way it will pass for odd number: $n") } }
// FIXME It won't work. rdd.map(throwExceptionForOddNumber).foreachPartition { _ => val tc = TaskContext.get tc.addTaskFailureListener(printTaskErrorInfo) }
// Listener registration matters. rdd.mapPartitions { (it: Iterator[Long]) => val tc = TaskContext.get tc.addTaskFailureListener(printTaskErrorInfo) it }.map(throwExceptionForOddNumber).count