Skip to content

= ShuffleWriteMetrics

ShuffleWriteMetrics is a <> that represents task metrics about writing shuffle data.

ShuffleWriteMetrics tracks the following task metrics:

  1. <>
  2. <>
  3. <>

NOTE: ROOT:spark-accumulators.md[Accumulators] allow tasks (running on executors) to communicate with the driver.

[[accumulators]] .ShuffleWriteMetrics's Accumulators [cols="1,2",options="header",width="100%"] |=== | Name | Description

| [[_bytesWritten]] _bytesWritten | Accumulator to track how many shuffle bytes were written in a shuffle task.

Used when ShuffleWriteMetrics is requested the <> and to <> or <> it.

NOTE: _bytesWritten is available as internal.metrics.shuffle.write.bytesWritten (internally shuffleWrite.BYTES_WRITTEN) in executor:TaskMetrics.md[TaskMetrics].

| [[_writeTime]] _writeTime | Accumulator to track shuffle write time (as 64-bit integer) of a shuffle task.

Used when ShuffleWriteMetrics is requested the <> and to <>.

NOTE: _writeTime is available as internal.metrics.shuffle.write.writeTime (internally shuffleWrite.WRITE_TIME) in executor:TaskMetrics.md[TaskMetrics].

| [[_recordsWritten]] _recordsWritten | Accumulator to track how many shuffle records were written in a shuffle task.

Used when ShuffleWriteMetrics is requested the <> and to <> or <> it.

NOTE: _recordsWritten is available as internal.metrics.shuffle.write.recordsWritten (internally shuffleWrite.RECORDS_WRITTEN) in executor:TaskMetrics.md[TaskMetrics].

|===

== [[decRecordsWritten]] decRecordsWritten Method

CAUTION: FIXME

== [[decBytesWritten]] decBytesWritten Method

CAUTION: FIXME

== [[writeTime]] writeTime Method

CAUTION: FIXME

== [[recordsWritten]] recordsWritten Method

CAUTION: FIXME

== [[bytesWritten]] Returning Number of Shuffle Bytes Written -- bytesWritten Method

[source, scala]

bytesWritten: Long

bytesWritten represents the shuffle bytes written metrics of a shuffle task.

Internally, bytesWritten returns the sum of <<_bytesWritten, _bytesWritten>> internal accumulator.

[NOTE]

bytesWritten is used when:

  1. ShuffleWriteMetricsUIData is created

  2. In <>

  3. spark-SparkListener-StatsReportListener.md#onStageCompleted[StatsReportListener intercepts stage completed events] to show shuffle bytes written

  4. shuffle:ShuffleExternalSorter.md#writeSortedFile[ShuffleExternalSorter does writeSortedFile] (to incDiskBytesSpilled)

  5. spark-history-server:JsonProtocol.md#taskMetricsToJson[JsonProtocol converts ShuffleWriteMetrics to JSON]

  6. spark-webui-executors-ExecutorsListener.md#onTaskEnd[ExecutorsListener intercepts task end events] to update executor metrics

7. spark-webui-JobProgressListener.md#updateAggregateMetrics[JobProgressListener updates stage and executor metrics]

== [[incBytesWritten]] Incrementing Shuffle Bytes Written Metrics -- incBytesWritten Method

[source, scala]

incBytesWritten(v: Long): Unit

incBytesWritten simply adds v to <<_bytesWritten, _bytesWritten>> internal accumulator.

[NOTE]

incBytesWritten is used when:

  1. shuffle:UnsafeShuffleWriter.md#mergeSpills[UnsafeShuffleWriter does mergeSpills]

  2. storage:DiskBlockObjectWriter.md#updateBytesWritten[DiskBlockObjectWriter does updateBytesWritten]

  3. spark-history-server:JsonProtocol.md#taskMetricsFromJson[JsonProtocol creates TaskMetrics from JSON]

====

== [[incWriteTime]] Incrementing Shuffle Write Time Metrics -- incWriteTime Method

[source, scala]

incWriteTime(v: Long): Unit

incWriteTime simply adds v to <<_writeTime, _writeTime>> internal accumulator.

[NOTE]

incWriteTime is used when:

  1. shuffle:SortShuffleWriter.md#stop[SortShuffleWriter stops].

  2. BypassMergeSortShuffleWriter shuffle:BypassMergeSortShuffleWriter.md#write[writes records] (i.e. when it initializes DiskBlockObjectWriter partition writers) and later when shuffle:BypassMergeSortShuffleWriter.md#writePartitionedFile[concatenates per-partition files into a single file].

  3. shuffle:UnsafeShuffleWriter.md#mergeSpillsWithTransferTo[UnsafeShuffleWriter does mergeSpillsWithTransferTo].

  4. storage:DiskBlockObjectWriter.md#commitAndGet[DiskBlockObjectWriter does commitAndGet] (but only when syncWrites flag is enabled that forces outstanding writes to disk).

  5. spark-history-server:JsonProtocol.md#taskMetricsFromJson[JsonProtocol creates TaskMetrics from JSON]

6. TimeTrackingOutputStream does its operation (after all it is an output stream to track shuffle write time).

== [[incRecordsWritten]] Incrementing Shuffle Records Written Metrics -- incRecordsWritten Method

[source, scala]

incRecordsWritten(v: Long): Unit

incRecordsWritten simply adds v to <<_recordsWritten, _recordsWritten>> internal accumulator.

[NOTE]

incRecordsWritten is used when:

  1. shuffle:ShuffleExternalSorter.md#writeSortedFile[ShuffleExternalSorter does writeSortedFile]

  2. storage:DiskBlockObjectWriter.md#recordWritten[DiskBlockObjectWriter does recordWritten]

  3. spark-history-server:JsonProtocol.md#taskMetricsFromJson[JsonProtocol creates TaskMetrics from JSON]

====


Last update: 2020-10-06