Skip to content

TaskContext

TaskContext is the <> for <> that serve the following purpose:

  • Hold contextual information about a scheduler:Task.md[task] at execution, e.g. <>, <>, <>, <>

  • Give access to the lifecycle of a task, e.g. <>, <>

A task can access the TaskContext instance using <> object method (that simply returns null unless executed within the execution thread of a task).

[source, scala]

import org.apache.spark.TaskContext val ctx = TaskContext.get


TaskContext allows for <> and <> that were set on the driver.

[[contract]] .TaskContext Contract [cols="1m,3",options="header",width="100%"] |=== | Method | Description

| addTaskCompletionListener a| [[addTaskCompletionListener]]

[source, scala]

addTaskCompletionListener( listener: TaskCompletionListener): TaskContext // a concrete method for a Scala-friendly usage addTaskCompletionListenerU: TaskContext


Registers a TaskCompletionListener

Used when...FIXME

| addTaskFailureListener a| [[addTaskFailureListener]]

[source, scala]

addTaskFailureListener(listener: TaskFailureListener): TaskContext // a concrete method for a Scala-friendly usage addTaskFailureListener(f: (TaskContext, Throwable) => Unit): TaskContext


Registers a TaskFailureListener

Used when...FIXME

| attemptNumber a| [[attemptNumber]]

[source, scala]

attemptNumber(): Int

Specifies how many times the task has been attempted to execute (starting from 0)

Used when...FIXME

| fetchFailed a| [[fetchFailed]]

[source, scala]

fetchFailed: Option[FetchFailedException]

Used when...FIXME

| getKillReason a| [[getKillReason]]

[source, scala]

getKillReason(): Option[String]

Used when...FIXME

| getLocalProperties a| [[getLocalProperties]]

[source, scala]

getLocalProperties: Properties

Used when...FIXME

| getLocalProperty a| [[getLocalProperty]]

[source, scala]

getLocalProperty(key: String): String

Used when...FIXME

| getMetricsSources a| [[getMetricsSources]]

[source, scala]

getMetricsSources(sourceName: String): Seq[Source]

<> by sourceName which are associated with the instance that runs the task.

Used when...FIXME

| isCompleted a| [[isCompleted]]

[source, scala]

isCompleted(): Boolean

Used when...FIXME

| isInterrupted a| [[isInterrupted]]

[source, scala]

isInterrupted(): Boolean

Used when...FIXME

| isRunningLocally a| [[isRunningLocally]]

[source, scala]

isRunningLocally(): Boolean

Used when...FIXME

| killTaskIfInterrupted a| [[killTaskIfInterrupted]]

[source, scala]

killTaskIfInterrupted(): Unit

Used when...FIXME

| markInterrupted a| [[markInterrupted]]

[source, scala]

markInterrupted(reason: String): Unit

Used when...FIXME

| markTaskCompleted a| [[markTaskCompleted]]

[source, scala]

markTaskCompleted(error: Option[Throwable]): Unit

Used when...FIXME

| markTaskFailed a| [[markTaskFailed]]

[source, scala]

markTaskFailed(error: Throwable): Unit

Used when...FIXME

| partitionId a| [[partitionId]]

[source, scala]

partitionId(): Int

ID of the spark-rdd-Partition.md[Partition] computed by the task

Used when...FIXME

| registerAccumulator a| [[registerAccumulator]]

[source, scala]

registerAccumulator(a: AccumulatorV2[_, _]): Unit

Used when...FIXME

| setFetchFailed a| [[setFetchFailed]]

[source, scala]

setFetchFailed(fetchFailed: FetchFailedException): Unit

Used when...FIXME

| stageAttemptNumber a| [[stageAttemptNumber]]

[source, scala]

stageAttemptNumber(): Int

Used when...FIXME

| stageId a| [[stageId]]

[source, scala]

stageId(): Int

ID of the scheduler:Stage.md[Stage] the task belongs to

Used when...FIXME

| taskAttemptId a| [[taskAttemptId]]

[source, scala]

taskAttemptId(): Long

Task (execution) attempt ID

Used when...FIXME

| taskMemoryManager a| [[taskMemoryManager]]

[source, scala]

taskMemoryManager(): TaskMemoryManager

memory:TaskMemoryManager.md[TaskMemoryManager]

Used when...FIXME

| taskMetrics a| [[taskMetrics]]

[source, scala]

taskMetrics(): TaskMetrics

executor:TaskMetrics.md[]

Used when...FIXME

|===

[[implementations]] .TaskContexts [cols="1,3",options="header",width="100%"] |=== | TaskContext | Description

| <> | [[BarrierTaskContext]]

| <> | [[TaskContextImpl]]

|===

== [[setTaskContext]] Setting Thread-Local TaskContext -- setTaskContext Object Method

[source, scala]

setTaskContext(tc: TaskContext): Unit

setTaskContext binds the given TaskContext as a thread-local variable.

[NOTE]

setTaskContext is used when:

  • Task is requested to scheduler:Task.md#run[run] (when Executor is requested to executor:Executor.md#launchTask[launch a task (on "Executor task launch worker" thread pool) sometime in the future])

* other cases of less importance

== [[get]] Accessing Active TaskContext -- get Object Method

[source, scala]

get(): TaskContext

get returns the thread-local TaskContext instance (by requesting the taskContext thread-local variable to get the instance).

NOTE: get is a method of TaskContext object in Scala and so it is just one instance available (per classloader). With the https://docs.oracle.com/javase/8/docs/api/java/lang/ThreadLocal.html[ThreadLocal] variable (ThreadLocal[TaskContext]), the TaskContext instance is thread-local and so allows for associating state with the thread of a task.

[source, scala]

val rdd = sc.range(0, 3, numSlices = 3)

assert(rdd.partitions.size == 3)

rdd.foreach { n => import org.apache.spark.TaskContext val tc = TaskContext.get val msg = s"""|------------------- |partitionId: ${tc.partitionId} |stageId: ${tc.stageId} |attemptNum: ${tc.attemptNumber} |taskAttemptId: ${tc.taskAttemptId} |-------------------""".stripMargin println(msg) }


== [[registering-task-listeners]] Registering Task Listeners

Using TaskContext object you can register task listeners for <> and <>.

=== [[addTaskCompletionListener]] addTaskCompletionListener Method

[source, scala]

addTaskCompletionListener(listener: TaskCompletionListener): TaskContext addTaskCompletionListener(f: (TaskContext) => Unit): TaskContext


addTaskCompletionListener methods register a TaskCompletionListener listener to be executed on task completion.

NOTE: It will be executed regardless of the final state of a task - success, failure, or cancellation.

[source, scala]

val rdd = sc.range(0, 5, numSlices = 1)

import org.apache.spark.TaskContext val printTaskInfo = (tc: TaskContext) => { val msg = s"""|------------------- |partitionId: ${tc.partitionId} |stageId: ${tc.stageId} |attemptNum: ${tc.attemptNumber} |taskAttemptId: ${tc.taskAttemptId} |-------------------""".stripMargin println(msg) }

rdd.foreachPartition { _ => val tc = TaskContext.get tc.addTaskCompletionListener(printTaskInfo) }


=== [[addTaskFailureListener]] addTaskFailureListener Method

[source, scala]

addTaskFailureListener(listener: TaskFailureListener): TaskContext addTaskFailureListener(f: (TaskContext, Throwable) => Unit): TaskContext


addTaskFailureListener methods register a TaskFailureListener listener to be executed on task failure only. It can be executed multiple times since a task can be re-attempted when it fails.

[source, scala]

val rdd = sc.range(0, 2, numSlices = 2)

import org.apache.spark.TaskContext val printTaskErrorInfo = (tc: TaskContext, error: Throwable) => { val msg = s"""|------------------- |partitionId: ${tc.partitionId} |stageId: ${tc.stageId} |attemptNum: ${tc.attemptNumber} |taskAttemptId: ${tc.taskAttemptId} |error: ${error.toString} |-------------------""".stripMargin println(msg) }

val throwExceptionForOddNumber = (n: Long) => { if (n % 2 == 1) { throw new Exception(s"No way it will pass for odd number: $n") } }

// FIXME It won't work. rdd.map(throwExceptionForOddNumber).foreachPartition { _ => val tc = TaskContext.get tc.addTaskFailureListener(printTaskErrorInfo) }

// Listener registration matters. rdd.mapPartitions { (it: Iterator[Long]) => val tc = TaskContext.get tc.addTaskFailureListener(printTaskErrorInfo) it }.map(throwExceptionForOddNumber).count



Last update: 2020-10-09