TorrentBroadcast — BroadcastFactory With BitTorrent-Like Protocol For Block Distribution

TorrentBroadcast is a BroadcastFactory that uses a BitTorrent-like protocol for block distribution (that only happens when tasks access broadcast variables on executors).

sparkcontext broadcast bittorrent
Figure 1. TorrentBroadcast - broadcasting using BitTorrent

TorrentBroadcast is created exclusively when TorrentBroadcastFactory is requested to newBroadcast.

// On the driver
val sc: SparkContext = ???
val anyScalaValue = ???
val b = sc.broadcast(anyScalaValue) // <-- TorrentBroadcast is created

A broadcast variable is stored on the driver’s BlockManager as a single value and separately as broadcast blocks (after it was divided into broadcast blocks, i.e. blockified). The broadcast block size is the value of spark.broadcast.blockSize Spark property.

sparkcontext broadcast bittorrent newBroadcast
Figure 2. TorrentBroadcast puts broadcast and the chunks to driver’s BlockManager
TorrentBroadcast-based broadcast variables are created using TorrentBroadcastFactory.
TorrentBroadcast belongs to org.apache.spark.broadcast package.

TorrentBroadcast uses _value internal registry for the value that is computed by readBroadcastBlock when first requested.

_value: T
_value is a @transient private lazy val which means that it is not serialized to be sent remotely and instantiated when first requested.

TorrentBroadcast uses a BroadcastBlockId for…​FIXME

Enable INFO or DEBUG logging levels for org.apache.spark.broadcast.TorrentBroadcast logger to see what happens inside.

Add the following line to conf/

Refer to Logging.

unBlockifyObject Method


releaseLock Method


Getting Value of Broadcast Variable — getValue Method

def getValue(): T

getValue returns the value of a broadcast variable.

getValue is part of the Broadcast Variable Contract and is the only way to access the value of a broadcast variable.

Internaly, getValue reads the internal _value that, once accessed, reads broadcast blocks from the local or remote BlockManagers.

The internal _value is transient and lazy, i.e. it is not preserved when serialized and (re)created only when requested, respectively. That "trick" allows for serializing broadcast values on the driver before they are transferred to executors over the wire.

readBroadcastBlock Internal Method

readBroadcastBlock(): T

Internally, readBroadcastBlock sets the SparkConf

The current SparkConf is available using SparkEnv.get.conf.

readBroadcastBlock requests the local BlockManager for values of the broadcast.

The current BlockManager is available using SparkEnv.get.blockManager.

If the broadcast was available locally, readBroadcastBlock releases a lock for the broadcast and returns the value.

If however the broadcast was not found locally, you should see the following INFO message in the logs:

INFO Started reading broadcast variable [id]

readBroadcastBlock reads blocks (as chunks) of the broadcast.

You should see the following INFO message in the logs:

INFO Reading broadcast variable [id] took [usedTimeMs]
readBroadcastBlock uses the current Serializer and the internal CompressionCodec to bring all the blocks together as one single broadcast variable.

readBroadcastBlock stores the broadcast variable with MEMORY_AND_DISK storage level to the local BlockManager. When storing the broadcast variable was unsuccessful, a SparkException is thrown.

Failed to store [broadcastId] in BlockManager

The broadcast variable is returned.

readBroadcastBlock is used exclusively when TorrentBroadcast is requested for the value.

setConf Internal Method

setConf(conf: SparkConf): Unit

setConf uses the input conf SparkConf to set compression codec and the block size.

Internally, setConf reads spark.broadcast.compress Spark property and if enabled (which it is by default) sets a CompressionCodec (as an internal compressionCodec property).

setConf also reads spark.broadcast.blockSize Spark property and sets the block size (as the internal blockSize property).

Storing Broadcast and Blocks in Local BlockManager — writeBlocks Internal Method

writeBlocks(value: T): Int

writeBlocks stores the broadcast’s value and blocks in the driver’s BlockManager. It returns the number of the broadcast blocks the broadcast was divided into.

Internally, writeBlocks stores the block for value broadcast to the local BlockManager (using a new BroadcastBlockId, value, MEMORY_AND_DISK storage level and without telling the driver).

If storing the broadcast block fails, you should see the following SparkException in the logs:

Failed to store [broadcastId] in BlockManager

writeBlocks divides value into blocks (of spark.broadcast.blockSize size) using the Serializer and an optional CompressionCodec (enabled by spark.broadcast.compress). Every block gets its own BroadcastBlockId (with piece and an index) that is wrapped inside a ChunkedByteBuffer. Blocks are stored in the local BlockManager (using the piece block id, MEMORY_AND_DISK_SER storage level and informing the driver).

The entire broadcast value is stored in the local BlockManager with MEMORY_AND_DISK storage level, and the pieces with MEMORY_AND_DISK_SER storage level.

If storing any of the broadcast pieces fails, you should see the following SparkException in the logs:

Failed to store [pieceId] of [broadcastId] in local BlockManager
writeBlocks is used exclusively when TorrentBroadcast is created (that happens on the driver only).

Chunking Broadcast Into Blocks — blockifyObject Method

  obj: T,
  blockSize: Int,
  serializer: Serializer,
  compressionCodec: Option[CompressionCodec]): Array[ByteBuffer]

blockifyObject divides (aka blockifies) the input obj broadcast variable into blocks (of ByteBuffer). blockifyObject uses the input serializer Serializer to write obj in a serialized format to a ChunkedByteBufferOutputStream (of blockSize size) with the optional CompressionCodec.

doUnpersist Method

doUnpersist(blocking: Boolean): Unit
doUnpersist is part of the Broadcast Variable Contract and is executed from unpersist method.

unpersist Internal Method

  id: Long,
  removeFromDriver: Boolean,
  blocking: Boolean): Unit

unpersist removes all broadcast blocks from executors and possibly the driver (only when removeFromDriver flag is enabled).

unpersist belongs to TorrentBroadcast private object and is executed when TorrentBroadcast unpersists a broadcast variable and removes a broadcast variable completely.

When executed, you should see the following DEBUG message in the logs:

DEBUG TorrentBroadcast: Unpersisting TorrentBroadcast [id]
unpersist uses SparkEnv to get the BlockManagerMaster (through blockManager property).

Creating TorrentBroadcast Instance

TorrentBroadcast takes the following when created:

  • Object (the value) to be broadcast

  • ID

TorrentBroadcast initializes the internal registries and counters.

Reading Blocks — readBlocks Internal Method

readBlocks(): Array[BlockData]


readBlocks is used when…​FIXME