Skip to content

JobProgressListener

== [[JobProgressListener]] JobProgressListener Spark Listener

JobProgressListener is a ROOT:SparkListener.md[] for spark-webui.md[web UI].

JobProgressListener intercepts the following ROOT:SparkListener.md#SparkListenerEvent[Spark events].

.JobProgressListener Events [cols="1,2",options="header",width="100%"] |=== | Handler | Purpose | <> | Creates a <>. It updates <>, <>, <>, <>, <>, <> and <>.

| <> | Removes an entry in <>. It also removes entries in <> and <>. It updates <>, <>, <>, <> and <>.

| <> | Updates the StageUIData and JobUIData. | <> | Updates the task's StageUIData and JobUIData, and registers a new TaskUIData. | <> | Updates the task's StageUIData (and TaskUIData), ExecutorSummary, and JobUIData.

| <> |

| onEnvironmentUpdate | Sets schedulingMode property using the current ROOT:configuration-properties.md#spark.scheduler.mode[spark.scheduler.mode] (from Spark Properties environment details).

Used in spark-webui-AllJobsPage.md[AllJobsPage] (for the Scheduling Mode), and to display pools in spark-webui-JobsTab.md[JobsTab] and spark-webui-StagesTab.md[StagesTab].

FIXME: Add the links/screenshots for pools. | onBlockManagerAdded | Records an executor and its block manager in the internal <> registry. | onBlockManagerRemoved | Removes the executor from the internal <> registry. | onApplicationStart | Records a Spark application's start time (in the internal startTime).

Used in spark-webui-jobs.md[Jobs tab] (for a total uptime and the event timeline) and spark-webui-jobs.md[Job page] (for the event timeline). | onApplicationEnd | Records a Spark application's end time (in the internal endTime).

Used in spark-webui-jobs.md[Jobs tab] (for a total uptime). | onTaskGettingResult | Does nothing.

FIXME: Why is this event intercepted at all?! |===

=== [[updateAggregateMetrics]] updateAggregateMetrics Method

CAUTION: FIXME

=== [[registries]] Registries and Counters

JobProgressListener uses registries to collect information about job executions.

.JobProgressListener Registries and Counters [cols="1,2",options="header",width="100%"] |=== | Name | Description | [[numCompletedStages]] numCompletedStages | | [[numFailedStages]] numFailedStages |

| [[stageIdToData]] stageIdToData | Holds <> per stage, i.e. the stage and stage attempt ids. | [[stageIdToInfo]] stageIdToInfo | | [[stageIdToActiveJobIds]] stageIdToActiveJobIds | | [[poolToActiveStages]] poolToActiveStages |

| [[activeJobs]] activeJobs | | [[completedJobs]] completedJobs | | [[failedJobs]] failedJobs | | [[jobIdToData]] jobIdToData | | [[jobGroupToJobIds]] jobGroupToJobIds |

| [[pendingStages]] pendingStages | | [[activeStages]] activeStages | | [[completedStages]] completedStages | | [[skippedStages]] skippedStages | | [[failedStages]] failedStages |

| [[executorIdToBlockManagerId]] executorIdToBlockManagerId | The lookup table of storage:BlockManagerId.md[]s per executor id.

Used to track block managers so the Stage page can display Address in spark-webui-StagePage.md#ExecutorTable[Aggregated Metrics by Executor].

FIXME: How does Executors page collect the very same information? |===

=== [[onJobStart]] onJobStart Callback

[source, scala]

onJobStart(jobStart: SparkListenerJobStart): Unit

onJobStart creates a <>. It updates <>, <>, <>, <>, <>, <> and <>.

onJobStart reads the optional Spark Job group id as spark.jobGroup.id (from properties in the input jobStart).

onJobStart then creates a JobUIData using the input jobStart with status attribute set to JobExecutionStatus.RUNNING and records it in <> and <> registries.

onJobStart looks the job ids for the group id (in <> registry) and adds the job id.

The internal <> is updated with scheduler:spark-scheduler-StageInfo.md[StageInfo] for the stage id (for every StageInfo in SparkListenerJobStart.stageInfos collection).

onJobStart records the stages of the job in <>.

onJobStart records scheduler:spark-scheduler-StageInfo.md[StageInfos] in <> and <>.

=== [[onJobEnd]] onJobEnd Method

[source, scala]

onJobEnd(jobEnd: SparkListenerJobEnd): Unit

onJobEnd removes an entry in <>. It also removes entries in <> and <>. It updates <>, <>, <>, <> and <>.

onJobEnd removes the job from <> registry. It removes stages from <> registry.

When completed successfully, the job is added to <> registry with status attribute set to JobExecutionStatus.SUCCEEDED. <> gets incremented.

When failed, the job is added to <> registry with status attribute set to JobExecutionStatus.FAILED. <> gets incremented.

For every stage in the job, the stage is removed from the active jobs (in <>) that can remove the entire entry if no active jobs exist.

Every pending stage in <> gets added to <>.

=== [[onExecutorMetricsUpdate]] onExecutorMetricsUpdate Method

[source, scala]

onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit

=== [[onTaskStart]] onTaskStart Method

[source, scala]

onTaskStart(taskStart: SparkListenerTaskStart): Unit

onTaskStart updates StageUIData and JobUIData, and registers a new TaskUIData.

onTaskStart takes TaskInfo from the input taskStart.

onTaskStart looks the StageUIData for the stage and stage attempt ids up (in <> registry).

onTaskStart increments numActiveTasks and puts a TaskUIData for the task in stageData.taskData.

Ultimately, onTaskStart looks the stage in the internal <> and for each active job reads its JobUIData (from <>). It then increments numActiveTasks.

=== [[onTaskEnd]] onTaskEnd Method

[source, scala]

onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit

onTaskEnd updates the StageUIData (and TaskUIData), ExecutorSummary, and JobUIData.

onTaskEnd takes TaskInfo from the input taskEnd.

NOTE: onTaskEnd does its processing when the TaskInfo is available and stageAttemptId is not -1.

onTaskEnd looks the StageUIData for the stage and stage attempt ids up (in <> registry).

onTaskEnd saves accumulables in the StageUIData.

onTaskEnd reads the ExecutorSummary for the executor (the task has finished on).

Depending on the task end's reason onTaskEnd increments succeededTasks, killedTasks or failedTasks counters.

onTaskEnd adds the task's duration to taskTime.

onTaskEnd decrements the number of active tasks (in the StageUIData).

Again, depending on the task end's reason onTaskEnd computes errorMessage and updates StageUIData.

CAUTION: FIXME Why is the same information in two different registries -- stageData and execSummary?!

If taskMetrics is available, <> is executed.

The task's TaskUIData is looked up in stageData.taskData and updateTaskInfo and updateTaskMetrics are executed. errorMessage is updated.

onTaskEnd makes sure that the number of tasks in StageUIData (stageData.taskData) is not above <> and drops the excess.

Ultimately, onTaskEnd looks the stage in the internal <> and for each active job reads its JobUIData (from <>). It then decrements numActiveTasks and increments numCompletedTasks, numKilledTasks or numFailedTasks depending on the task's end reason.

=== [[onStageSubmitted]] onStageSubmitted Method

[source, scala]

onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit

=== [[onStageCompleted]] onStageCompleted Method

[source, scala]

onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit

onStageCompleted updates the StageUIData and JobUIData.

onStageCompleted reads stageInfo from the input stageCompleted and records it in <> registry.

onStageCompleted looks the StageUIData for the stage and the stage attempt ids up in <> registry.

onStageCompleted records accumulables in StageUIData.

onStageCompleted removes the stage from <> and <> registries.

If the stage completed successfully (i.e. has no failureReason), onStageCompleted adds the stage to <> registry and increments <> counter. It trims <>.

Otherwise, when the stage failed, onStageCompleted adds the stage to <> registry and increments <> counter. It trims <>.

Ultimately, onStageCompleted looks the stage in the internal <> and for each active job reads its JobUIData (from <>). It then decrements numActiveStages. When completed successfully, it adds the stage to completedStageIndices. With failure, numFailedStages gets incremented.

=== [[JobUIData]] JobUIData

CAUTION: FIXME

=== [[blockManagerIds]] blockManagerIds method

[source, scala]

blockManagerIds: Seq[BlockManagerId]

CAUTION: FIXME

=== [[StageUIData]] StageUIData

CAUTION: FIXME

=== [[settings]] Settings

.Spark Properties [options="header",width="100%"] |=== | Setting | Default Value | Description | [[spark_ui_retainedJobs]] spark.ui.retainedJobs | 1000 | The number of jobs to hold information about | [[spark_ui_retainedStages]] spark.ui.retainedStages | 1000 | The number of stages to hold information about | [[spark_ui_retainedTasks]] spark.ui.retainedTasks | 100000 | The number of tasks to hold information about |===


Last update: 2020-10-12