Skip to content


= TorrentBroadcast

TorrentBroadcast is a[] that uses a BitTorrent-like protocol for broadcast blocks distribution.

.TorrentBroadcast -- Broadcasting using BitTorrent image::sparkcontext-broadcast-bittorrent.png[align="center"]

When a[broadcast variable is created (using SparkContext.broadcast)] on the driver, a <>.

[source, scala]

// On the driver val sc: SparkContext = ??? val anyScalaValue = ??? val b = sc.broadcast(anyScalaValue) // ← TorrentBroadcast is created

A broadcast variable is stored on the driver's[BlockManager] as a single value and separately as broadcast blocks (after it was <>). The broadcast block size is the value of[spark.broadcast.blockSize] Spark property.

.TorrentBroadcast puts broadcast and the chunks to driver's BlockManager image::sparkcontext-broadcast-bittorrent-newBroadcast.png[align="center"]

NOTE: TorrentBroadcast-based broadcast variables are created using[TorrentBroadcastFactory].

== [[creating-instance]] Creating Instance

TorrentBroadcast takes the following to be created:

  • [[obj]] Object (the value) to be broadcast
  • [[id]] ID

TorrentBroadcast is created when TorrentBroadcastFactory is requested for a[new broadcast variable].

== [[_value]] Transient Lazy Broadcast Value

[source, scala]

_value: T

TorrentBroadcast uses _value internal registry for the value that is <> on demand (and cached afterwards).

_value is a @transient private lazy val and uses two Scala language features:

  • It is not serialized when the TorrentBroadcast is serialized to be sent over the wire to executors (and has to be <> afterwards)

  • It is lazily instantiated when first requested and cached afterwards

== [[numBlocks]] numBlocks Internal Value

TorrentBroadcast uses numBlocks internal value for the total number of blocks it contains. It is <> when TorrentBroadcast is <>.

== [[getValue]] Getting Value of Broadcast Variable

[source, scala]

def getValue(): T

getValue returns the <<_value, _value>>.

getValue is part of the[Broadcast] abstraction.

== [[broadcastId]] BroadcastBlockId

TorrentBroadcast uses a[BroadcastBlockId] for...FIXME

== [[readBroadcastBlock]] readBroadcastBlock Internal Method

[source, scala]

readBroadcastBlock(): T

readBroadcastBlock[uses the SparkEnv] to access[BroadcastManager] that is requested for[cached broadcast values].

readBroadcastBlock looks up the <> in the cached broadcast values and returns it if found.

If not found, readBroadcastBlock requests the SparkEnv for the[SparkConf] and <>.

readBroadcastBlock[uses the SparkEnv] to access[BlockManager].

readBroadcastBlock requests the BlockManager for[getLocalValues].

If the broadcast data was available locally, readBroadcastBlock <> for the broadcast and returns the value.

If however the broadcast data was not found locally, you should see the following INFO message in the logs:


Started reading broadcast variable [id]

readBroadcastBlock <> of the broadcast.

You should see the following INFO message in the logs:


Reading broadcast variable [id] took [usedTimeMs]

readBroadcastBlock <ByteBuffer blocks>>

NOTE: readBroadcastBlock uses the[current Serializer] and the internal[CompressionCodec] to bring all the blocks together as one single broadcast variable.

readBroadcastBlock[stores the broadcast variable with MEMORY_AND_DISK storage level to the local BlockManager]. When storing the broadcast variable was unsuccessful, a SparkException is thrown.


Failed to store [broadcastId] in BlockManager

The broadcast variable is returned.

readBroadcastBlock is used when TorrentBroadcast is requested for the <<_value, broadcast value>>.

== [[setConf]] setConf Internal Method

[source, scala]

setConf( conf: SparkConf): Unit

setConf uses the input conf[SparkConf] to set compression codec and the block size.

Internally, setConf reads[spark.broadcast.compress] configuration property and if enabled (which it is by default) sets a[CompressionCodec] (as an internal compressionCodec property).

setConf also reads[spark.broadcast.blockSize] Spark property and sets the block size (as the internal blockSize property).

setConf is executed when <> or <>.

== [[writeBlocks]] Storing Broadcast and Blocks to BlockManager

[source, scala]

writeBlocks( value: T): Int

writeBlocks stores the given value (that is the <>) and the blocks in[]. writeBlocks returns the <> (was divided into).

Internally, writeBlocks uses the[SparkEnv] to access[BlockManager].

writeBlocks requests the BlockManager to[putSingle] (with MEMORY_AND_DISK storage level).

writeBlocks <> the given value (of the <>, the system[Serializer], and the optional <>).

For every block, writeBlocks creates a[BroadcastBlockId] for the <> and piece[index] identifier, and requests the BlockManager to[putBytes] (with MEMORY_AND_DISK_SER storage level).

The entire broadcast value is stored in the local BlockManager with MEMORY_AND_DISK storage level whereas the blocks with MEMORY_AND_DISK_SER storage level.

With <> writeBlocks...FIXME

In case of an error while storing the value or the blocks, writeBlocks throws a SparkException:


Failed to store [pieceId] of [broadcastId] in local BlockManager

writeBlocks is used when TorrentBroadcast is <> for the <> internal registry (that happens on the driver only).

== [[blockifyObject]] Chunking Broadcast Variable Into Blocks

[source, scala]

blockifyObjectT: Array[ByteBuffer]

blockifyObject divides (aka blockifies) the input obj value into blocks (ByteBuffer chunks). blockifyObject uses the given[] to write the value in a serialized format to a ChunkedByteBufferOutputStream of the given blockSize size with the optional[CompressionCodec].

blockifyObject is used when TorrentBroadcast is requested to <>.

== [[doUnpersist]] doUnpersist Method

[source, scala]

doUnpersist(blocking: Boolean): Unit

doUnpersist <>.

NOTE: doUnpersist is part of the[Broadcast Variable Contract] and is executed from <> method.

== [[doDestroy]] doDestroy Method

[source, scala]

doDestroy(blocking: Boolean): Unit

doDestroy <>, i.e. the driver and executors.

NOTE: doDestroy is executed when[Broadcast removes the persisted data and metadata related to a broadcast variable].

== [[unpersist]] unpersist Utility

[source, scala]

unpersist( id: Long, removeFromDriver: Boolean, blocking: Boolean): Unit

unpersist removes all broadcast blocks from executors and, with the given removeFromDriver flag enabled, from the driver.

When executed, unpersist prints out the following DEBUG message in the logs:


Unpersisting TorrentBroadcast [id]

unpersist requests[BlockManagerMaster to remove the id broadcast].

NOTE: unpersist uses[SparkEnv to get the BlockManagerMaster] (through blockManager property).

unpersist is used when:

  • TorrentBroadcast is requested to <> and <>

  • TorrentBroadcastFactory is requested to[unbroadcast]

== [[readBlocks]] Reading Broadcast Blocks

[source, scala]

readBlocks(): Array[BlockData]

readBlocks creates a local array of[]s for <> elements (that is later modified and returned).

readBlocks uses the[] to access[BlockManager] (that is later used to fetch local or remote blocks).

For every block (randomly-chosen by block ID between 0 and <>), readBlocks creates a[BroadcastBlockId] for the <> (of the broadcast variable) and the chunk identified by the piece prefix followed by the ID.

readBlocks prints out the following DEBUG message to the logs:


Reading piece [pieceId] of [broadcastId]

readBlocks first tries to look up the piece locally by requesting the BlockManager to[getLocalBytes] and, if found, stores the reference in the local block array (for the piece ID) and <> for the chunk.

If not found locally, readBlocks requests the BlockManager to[getRemoteBytes].


readBlocks throws a SparkException for blocks neither available locally nor remotely:


Failed to get [pieceId] of [broadcastId]

readBlocks is used when TorrentBroadcast is requested to <>.

== [[unBlockifyObject]] unBlockifyObject Utility

[source, scala]

unBlockifyObjectT: ClassTag: T


unBlockifyObject is used when TorrentBroadcast is requested to <>.

== [[releaseLock]] releaseLock Internal Method

[source, scala]

releaseLock( blockId: BlockId): Unit


releaseLock is used when TorrentBroadcast is requested to <> and <>.

== [[logging]] Logging

Enable ALL logging level for org.apache.spark.broadcast.TorrentBroadcast logger to see what happens inside.

Add the following line to conf/


Refer to[Logging].

Last update: 2020-10-06