TorrentBroadcast is a Broadcast Variable that uses a BitTorrent-like protocol for broadcast blocks distribution.
When a broadcast variable is created (using
SparkContext.broadcast) on the driver, a new instance of TorrentBroadcast is created.
// On the driver val sc: SparkContext = ??? val anyScalaValue = ??? val b = sc.broadcast(anyScalaValue) // <-- TorrentBroadcast is created
A broadcast variable is stored on the driver’s BlockManager as a single value and separately as broadcast blocks (after it was divided into broadcast blocks, i.e. blockified). The broadcast block size is the value of spark.broadcast.blockSize Spark property.
|TorrentBroadcast-based broadcast variables are created using TorrentBroadcastFactory.|
TorrentBroadcast takes the following to be created:
TorrentBroadcast is created when TorrentBroadcastFactory is requested for a new broadcast variable.
_value internal registry for the value that is computed on demand (and cached afterwards).
_value is a
@transient private lazy val and uses two Scala language features:
It is not serialized when the TorrentBroadcast is serialized to be sent over the wire to executors (and has to be re-computed afterwards)
It is lazily instantiated when first requested and cached afterwards
TorrentBroadcast uses a BroadcastBlockId for…FIXME
readBroadcastBlock looks up the BroadcastBlockId in the cached broadcast values and returns it if found.
readBroadcastBlock requests the BlockManager for getLocalValues.
If the broadcast data was available locally, readBroadcastBlock releases a lock for the broadcast and returns the value.
If however the broadcast data was not found locally, you should see the following INFO message in the logs:
Started reading broadcast variable [id]
readBroadcastBlock reads blocks (as chunks) of the broadcast.
You should see the following INFO message in the logs:
Reading broadcast variable [id] took [usedTimeMs]
readBroadcastBlock unblockifies the collection of
readBroadcastBlock uses the current
readBroadcastBlock stores the broadcast variable with
MEMORY_AND_DISK storage level to the local
BlockManager. When storing the broadcast variable was unsuccessful, a
SparkException is thrown.
Failed to store [broadcastId] in BlockManager
The broadcast variable is returned.
readBroadcastBlock is used when TorrentBroadcast is requested for the broadcast value.
setConf( conf: SparkConf): Unit
setConf uses the input
conf SparkConf to set compression codec and the block size.
setConf also reads spark.broadcast.blockSize Spark property and sets the block size (as the internal
writeBlocks( value: T): Int
writeBlocks requests the BlockManager to putSingle (with MEMORY_AND_DISK storage level).
The entire broadcast value is stored in the local BlockManager with MEMORY_AND_DISK storage level whereas the blocks with MEMORY_AND_DISK_SER storage level.
With checksumEnabled writeBlocks…FIXME
In case of an error while storing the value or the blocks, writeBlocks throws a SparkException:
Failed to store [pieceId] of [broadcastId] in local BlockManager
blockifyObject[T]( obj: T, blockSize: Int, serializer: Serializer, compressionCodec: Option[CompressionCodec]): Array[ByteBuffer]
blockifyObject divides (aka blockifies) the input
obj value into blocks (
ByteBuffer chunks). blockifyObject uses the given Serializer to write the value in a serialized format to a
ChunkedByteBufferOutputStream of the given
blockSize size with the optional CompressionCodec.
blockifyObject is used when TorrentBroadcast is requested to stores itself as blocks to a local BlockManager.
doUnpersist(blocking: Boolean): Unit
doDestroy(blocking: Boolean): Unit
doDestroy removes all the persisted state associated with a broadcast variable on all the nodes in a Spark application, i.e. the driver and executors.
unpersist( id: Long, removeFromDriver: Boolean, blocking: Boolean): Unit
unpersist removes all broadcast blocks from executors and, with the given removeFromDriver flag enabled, from the driver.
When executed, unpersist prints out the following DEBUG message in the logs:
Unpersisting TorrentBroadcast [id]
BlockManagerMaster to remove the
unpersist is used when:
TorrentBroadcast is requested to unpersist a broadcast variable on executors and remove a broadcast variable from the driver and executors
TorrentBroadcastFactory is requested to unbroadcast
For every block (randomly-chosen by block ID between 0 and numBlocks), readBlocks creates a BroadcastBlockId for the id (of the broadcast variable) and the chunk identified by the
piece prefix followed by the ID.
readBlocks prints out the following DEBUG message to the logs:
Reading piece [pieceId] of [broadcastId]
readBlocks first tries to look up the piece locally by requesting the BlockManager to getLocalBytes and, if found, stores the reference in the local block array (for the piece ID) and releaseLock for the chunk.
If not found locally, readBlocks requests the BlockManager to getRemoteBytes.
readBlocks throws a SparkException for blocks neither available locally nor remotely:
Failed to get [pieceId] of [broadcastId]
readBlocks is used when TorrentBroadcast is requested to readBroadcastBlock.
unBlockifyObject[T: ClassTag]( blocks: Array[InputStream], serializer: Serializer, compressionCodec: Option[CompressionCodec]): T
unBlockifyObject is used when TorrentBroadcast is requested to readBroadcastBlock.
ALL logging level for
org.apache.spark.broadcast.TorrentBroadcast logger to see what happens inside.
Add the following line to
Refer to Logging.