TorrentBroadcast

TorrentBroadcast is a Broadcast Variable that uses a BitTorrent-like protocol for broadcast blocks distribution.

sparkcontext broadcast bittorrent
Figure 1. TorrentBroadcast — Broadcasting using BitTorrent
// On the driver
val sc: SparkContext = ???
val anyScalaValue = ???
val b = sc.broadcast(anyScalaValue) // <-- TorrentBroadcast is created

A broadcast variable is stored on the driver’s BlockManager as a single value and separately as broadcast blocks (after it was divided into broadcast blocks, i.e. blockified). The broadcast block size is the value of spark.broadcast.blockSize Spark property.

sparkcontext broadcast bittorrent newBroadcast
Figure 2. TorrentBroadcast puts broadcast and the chunks to driver’s BlockManager
TorrentBroadcast-based broadcast variables are created using TorrentBroadcastFactory.

Creating Instance

TorrentBroadcast takes the following to be created:

  • Object (the value) to be broadcast

  • ID

TorrentBroadcast is created when TorrentBroadcastFactory is requested for a new broadcast variable.

Transient Lazy Broadcast Value

_value: T

TorrentBroadcast uses _value internal registry for the value that is computed on demand (and cached afterwards).

_value is a @transient private lazy val and uses two Scala language features:

  • It is not serialized when the TorrentBroadcast is serialized to be sent over the wire to executors (and has to be re-computed afterwards)

  • It is lazily instantiated when first requested and cached afterwards

numBlocks Internal Value

TorrentBroadcast uses numBlocks internal value for the total number of blocks it contains. It is initialized when TorrentBroadcast is created.

Getting Value of Broadcast Variable

def getValue(): T

getValue returns the _value.

getValue is part of the Broadcast abstraction.

BroadcastBlockId

TorrentBroadcast uses a BroadcastBlockId for…​FIXME

readBroadcastBlock Internal Method

readBroadcastBlock(): T

readBroadcastBlock uses the SparkEnv to access BroadcastManager that is requested for cached broadcast values.

readBroadcastBlock looks up the BroadcastBlockId in the cached broadcast values and returns it if found.

If not found, readBroadcastBlock requests the SparkEnv for the SparkConf and setConf.

readBroadcastBlock uses the SparkEnv to access BlockManager.

readBroadcastBlock requests the BlockManager for getLocalValues.

If the broadcast data was available locally, readBroadcastBlock releases a lock for the broadcast and returns the value.

If however the broadcast data was not found locally, you should see the following INFO message in the logs:

Started reading broadcast variable [id]

readBroadcastBlock reads blocks (as chunks) of the broadcast.

You should see the following INFO message in the logs:

Reading broadcast variable [id] took [usedTimeMs]
readBroadcastBlock uses the current Serializer and the internal CompressionCodec to bring all the blocks together as one single broadcast variable.

readBroadcastBlock stores the broadcast variable with MEMORY_AND_DISK storage level to the local BlockManager. When storing the broadcast variable was unsuccessful, a SparkException is thrown.

Failed to store [broadcastId] in BlockManager

The broadcast variable is returned.

readBroadcastBlock is used when TorrentBroadcast is requested for the broadcast value.

setConf Internal Method

setConf(
  conf: SparkConf): Unit

setConf uses the input conf SparkConf to set compression codec and the block size.

Internally, setConf reads spark.broadcast.compress configuration property and if enabled (which it is by default) sets a CompressionCodec (as an internal compressionCodec property).

setConf also reads spark.broadcast.blockSize Spark property and sets the block size (as the internal blockSize property).

Storing Broadcast and Blocks to BlockManager

writeBlocks(
  value: T): Int

writeBlocks stores the given value (that is the broadcast value) and the blocks in BlockManager. writeBlocks returns the number of blocks of the broadcast (was divided into).

Internally, writeBlocks uses the SparkEnv to access BlockManager.

writeBlocks requests the BlockManager to putSingle (with MEMORY_AND_DISK storage level).

writeBlocks blockify the given value (of the block size, the system Serializer, and the optional compressionCodec).

For every block, writeBlocks creates a BroadcastBlockId for the broadcast variable ID and piece[index] identifier, and requests the BlockManager to putBytes (with MEMORY_AND_DISK_SER storage level).

The entire broadcast value is stored in the local BlockManager with MEMORY_AND_DISK storage level whereas the blocks with MEMORY_AND_DISK_SER storage level.

With checksumEnabled writeBlocks…​FIXME

In case of an error while storing the value or the blocks, writeBlocks throws a SparkException:

Failed to store [pieceId] of [broadcastId] in local BlockManager

writeBlocks is used when TorrentBroadcast is created for the numBlocks internal registry (that happens on the driver only).

Chunking Broadcast Variable Into Blocks

blockifyObject[T](
  obj: T,
  blockSize: Int,
  serializer: Serializer,
  compressionCodec: Option[CompressionCodec]): Array[ByteBuffer]

blockifyObject divides (aka blockifies) the input obj value into blocks (ByteBuffer chunks). blockifyObject uses the given Serializer to write the value in a serialized format to a ChunkedByteBufferOutputStream of the given blockSize size with the optional CompressionCodec.

blockifyObject is used when TorrentBroadcast is requested to stores itself as blocks to a local BlockManager.

doUnpersist Method

doUnpersist(blocking: Boolean): Unit
doUnpersist is part of the Broadcast Variable Contract and is executed from unpersist method.

unpersist Utility

unpersist(
  id: Long,
  removeFromDriver: Boolean,
  blocking: Boolean): Unit

unpersist removes all broadcast blocks from executors and, with the given removeFromDriver flag enabled, from the driver.

When executed, unpersist prints out the following DEBUG message in the logs:

Unpersisting TorrentBroadcast [id]
unpersist uses SparkEnv to get the BlockManagerMaster (through blockManager property).

unpersist is used when:

Reading Broadcast Blocks

readBlocks(): Array[BlockData]

readBlocks creates a local array of BlockDatas for numBlocks elements (that is later modified and returned).

readBlocks uses the SparkEnv to access BlockManager (that is later used to fetch local or remote blocks).

For every block (randomly-chosen by block ID between 0 and numBlocks), readBlocks creates a BroadcastBlockId for the id (of the broadcast variable) and the chunk identified by the piece prefix followed by the ID.

readBlocks prints out the following DEBUG message to the logs:

Reading piece [pieceId] of [broadcastId]

readBlocks first tries to look up the piece locally by requesting the BlockManager to getLocalBytes and, if found, stores the reference in the local block array (for the piece ID) and releaseLock for the chunk.

If not found locally, readBlocks requests the BlockManager to getRemoteBytes.

readBlocks…​FIXME

readBlocks throws a SparkException for blocks neither available locally nor remotely:

Failed to get [pieceId] of [broadcastId]

readBlocks is used when TorrentBroadcast is requested to readBroadcastBlock.

unBlockifyObject Utility

unBlockifyObject[T: ClassTag](
  blocks: Array[InputStream],
  serializer: Serializer,
  compressionCodec: Option[CompressionCodec]): T

unBlockifyObject…​FIXME

unBlockifyObject is used when TorrentBroadcast is requested to readBroadcastBlock.

releaseLock Internal Method

releaseLock(
  blockId: BlockId): Unit

releaseLock…​FIXME

releaseLock is used when TorrentBroadcast is requested to readBroadcastBlock and readBlocks.

Logging

Enable ALL logging level for org.apache.spark.broadcast.TorrentBroadcast logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.broadcast.TorrentBroadcast=ALL

Refer to Logging.