JobProgressListener
== [[JobProgressListener]] JobProgressListener
Spark Listener
JobProgressListener
is a SparkListener.md[] for spark-webui.md[web UI].
JobProgressListener
intercepts the following SparkListener.md#SparkListenerEvent[Spark events].
.JobProgressListener
Events [cols="1,2",options="header",width="100%"] |=== | Handler | Purpose | <
| <
| <StageUIData
and JobUIData
. | <StageUIData
and JobUIData
, and registers a new TaskUIData
. | <StageUIData
(and TaskUIData
), ExecutorSummary
, and JobUIData
.
| <
| onEnvironmentUpdate
| Sets schedulingMode
property using the current configuration-properties.md#spark.scheduler.mode[spark.scheduler.mode] (from Spark Properties
environment details).
Used in spark-webui-AllJobsPage.md[AllJobsPage] (for the Scheduling Mode), and to display pools in spark-webui-JobsTab.md[JobsTab] and spark-webui-StagesTab.md[StagesTab].
FIXME: Add the links/screenshots for pools. | onBlockManagerAdded
| Records an executor and its block manager in the internal <onBlockManagerRemoved
| Removes the executor from the internal <onApplicationStart
| Records a Spark application's start time (in the internal startTime
).
Used in spark-webui-jobs.md[Jobs tab] (for a total uptime and the event timeline) and spark-webui-jobs.md[Job page] (for the event timeline). | onApplicationEnd
| Records a Spark application's end time (in the internal endTime
).
Used in spark-webui-jobs.md[Jobs tab] (for a total uptime). | onTaskGettingResult
| Does nothing.
FIXME: Why is this event intercepted at all?! |===
=== [[updateAggregateMetrics]] updateAggregateMetrics
Method
CAUTION: FIXME
=== [[registries]] Registries and Counters
JobProgressListener
uses registries to collect information about job executions.
.JobProgressListener
Registries and Counters [cols="1,2",options="header",width="100%"] |=== | Name | Description | [[numCompletedStages]] numCompletedStages
| | [[numFailedStages]] numFailedStages
|
| [[stageIdToData]] stageIdToData
| Holds <stageIdToInfo
| | [[stageIdToActiveJobIds]] stageIdToActiveJobIds
| | [[poolToActiveStages]] poolToActiveStages
|
| [[activeJobs]] activeJobs
| | [[completedJobs]] completedJobs
| | [[failedJobs]] failedJobs
| | [[jobIdToData]] jobIdToData
| | [[jobGroupToJobIds]] jobGroupToJobIds
|
| [[pendingStages]] pendingStages
| | [[activeStages]] activeStages
| | [[completedStages]] completedStages
| | [[skippedStages]] skippedStages
| | [[failedStages]] failedStages
|
| [[executorIdToBlockManagerId]] executorIdToBlockManagerId
| The lookup table of storage:BlockManagerId.md[]s per executor id.
Used to track block managers so the Stage page can display Address
in spark-webui-StagePage.md#ExecutorTable[Aggregated Metrics by Executor].
FIXME: How does Executors page collect the very same information? |===
=== [[onJobStart]] onJobStart
Callback
[source, scala]¶
onJobStart(jobStart: SparkListenerJobStart): Unit¶
onJobStart
creates a <
onJobStart
reads the optional Spark Job group id as spark.jobGroup.id
(from properties
in the input jobStart
).
onJobStart
then creates a JobUIData
using the input jobStart
with status
attribute set to JobExecutionStatus.RUNNING
and records it in <
onJobStart
looks the job ids for the group id (in <
The internal <StageInfo
in SparkListenerJobStart.stageInfos
collection).
onJobStart
records the stages of the job in <
onJobStart
records StageInfos in <
=== [[onJobEnd]] onJobEnd
Method
[source, scala]¶
onJobEnd(jobEnd: SparkListenerJobEnd): Unit¶
onJobEnd
removes an entry in <
onJobEnd
removes the job from <
When completed successfully, the job is added to <status
attribute set to JobExecutionStatus.SUCCEEDED
. <
When failed, the job is added to <status
attribute set to JobExecutionStatus.FAILED
. <
For every stage in the job, the stage is removed from the active jobs (in <
Every pending stage in <
=== [[onExecutorMetricsUpdate]] onExecutorMetricsUpdate
Method
[source, scala]¶
onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit¶
=== [[onTaskStart]] onTaskStart
Method
[source, scala]¶
onTaskStart(taskStart: SparkListenerTaskStart): Unit¶
onTaskStart
updates StageUIData
and JobUIData
, and registers a new TaskUIData
.
onTaskStart
takes TaskInfo from the input taskStart
.
onTaskStart
looks the StageUIData
for the stage and stage attempt ids up (in <
onTaskStart
increments numActiveTasks
and puts a TaskUIData
for the task in stageData.taskData
.
Ultimately, onTaskStart
looks the stage in the internal <JobUIData
(from <numActiveTasks
.
=== [[onTaskEnd]] onTaskEnd
Method
[source, scala]¶
onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit¶
onTaskEnd
updates the StageUIData
(and TaskUIData
), ExecutorSummary
, and JobUIData
.
onTaskEnd
takes TaskInfo from the input taskEnd
.
NOTE: onTaskEnd
does its processing when the TaskInfo
is available and stageAttemptId
is not -1
.
onTaskEnd
looks the StageUIData
for the stage and stage attempt ids up (in <
onTaskEnd
saves accumulables
in the StageUIData
.
onTaskEnd
reads the ExecutorSummary
for the executor (the task has finished on).
Depending on the task end's reason onTaskEnd
increments succeededTasks
, killedTasks
or failedTasks
counters.
onTaskEnd
adds the task's duration to taskTime
.
onTaskEnd
decrements the number of active tasks (in the StageUIData
).
Again, depending on the task end's reason onTaskEnd
computes errorMessage
and updates StageUIData
.
CAUTION: FIXME Why is the same information in two different registries -- stageData
and execSummary
?!
If taskMetrics
is available, <
The task's TaskUIData
is looked up in stageData.taskData
and updateTaskInfo
and updateTaskMetrics
are executed. errorMessage
is updated.
onTaskEnd
makes sure that the number of tasks in StageUIData
(stageData.taskData
) is not above <
Ultimately, onTaskEnd
looks the stage in the internal <JobUIData
(from <numActiveTasks
and increments numCompletedTasks
, numKilledTasks
or numFailedTasks
depending on the task's end reason.
=== [[onStageSubmitted]] onStageSubmitted
Method
[source, scala]¶
onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit¶
=== [[onStageCompleted]] onStageCompleted
Method
[source, scala]¶
onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit¶
onStageCompleted
updates the StageUIData
and JobUIData
.
onStageCompleted
reads stageInfo
from the input stageCompleted
and records it in <
onStageCompleted
looks the StageUIData
for the stage and the stage attempt ids up in <
onStageCompleted
records accumulables
in StageUIData
.
onStageCompleted
removes the stage from <
If the stage completed successfully (i.e. has no failureReason
), onStageCompleted
adds the stage to <
Otherwise, when the stage failed, onStageCompleted
adds the stage to <
Ultimately, onStageCompleted
looks the stage in the internal <JobUIData
(from <numActiveStages
. When completed successfully, it adds the stage to completedStageIndices
. With failure, numFailedStages
gets incremented.
=== [[JobUIData]] JobUIData
CAUTION: FIXME
=== [[blockManagerIds]] blockManagerIds method
[source, scala]¶
blockManagerIds: Seq[BlockManagerId]¶
CAUTION: FIXME
=== [[StageUIData]] StageUIData
CAUTION: FIXME
=== [[settings]] Settings
.Spark Properties [options="header",width="100%"] |=== | Setting | Default Value | Description | [[spark_ui_retainedJobs]] spark.ui.retainedJobs
| 1000
| The number of jobs to hold information about | [[spark_ui_retainedStages]] spark.ui.retainedStages
| 1000
| The number of stages to hold information about | [[spark_ui_retainedTasks]] spark.ui.retainedTasks
| 100000
| The number of tasks to hold information about |===