TorrentBroadcast is a Broadcast.md that uses a BitTorrent-like protocol for broadcast blocks distribution.
.TorrentBroadcast -- Broadcasting using BitTorrent image::sparkcontext-broadcast-bittorrent.png[align="center"]
When a SparkContext.md#broadcast[broadcast variable is created (using
SparkContext.broadcast)] on the driver, a <
// On the driver val sc: SparkContext = ??? val anyScalaValue = ??? val b = sc.broadcast(anyScalaValue) // ← TorrentBroadcast is created
A broadcast variable is stored on the driver's storage:BlockManager.md[BlockManager] as a single value and separately as broadcast blocks (after it was <
.TorrentBroadcast puts broadcast and the chunks to driver's BlockManager image::sparkcontext-broadcast-bittorrent-newBroadcast.png[align="center"]
NOTE: TorrentBroadcast-based broadcast variables are created using core:TorrentBroadcastFactory.md[TorrentBroadcastFactory].
== [[creating-instance]] Creating Instance
TorrentBroadcast takes the following to be created:
- [[obj]] Object (the value) to be broadcast
- [[id]] ID
TorrentBroadcast is created when TorrentBroadcastFactory is requested for a core:TorrentBroadcastFactory.md#newBroadcast[new broadcast variable].
== [[_value]] Transient Lazy Broadcast Value
_value internal registry for the value that is <
_value is a
@transient private lazy val and uses two Scala language features:
It is not serialized when the TorrentBroadcast is serialized to be sent over the wire to executors (and has to be <
It is lazily instantiated when first requested and cached afterwards
== [[numBlocks]] numBlocks Internal Value
numBlocks internal value for the total number of blocks it contains. It is <
== [[getValue]] Getting Value of Broadcast Variable
def getValue(): T¶
getValue returns the <<_value, _value>>.
getValue is part of the Broadcast.md#getValue[Broadcast] abstraction.
== [[broadcastId]] BroadcastBlockId
TorrentBroadcast uses a storage:BlockId.md#BroadcastBlockId[BroadcastBlockId] for...FIXME
== [[readBroadcastBlock]] readBroadcastBlock Internal Method
readBroadcastBlock SparkEnv.md#get[uses the SparkEnv] to access SparkEnv.md#broadcastManager[BroadcastManager] that is requested for BroadcastManager.md#cachedValues[cached broadcast values].
readBroadcastBlock looks up the <
If not found, readBroadcastBlock requests the SparkEnv for the core:SparkEnv.md#conf[SparkConf] and <
readBroadcastBlock SparkEnv.md#get[uses the SparkEnv] to access SparkEnv.md#blockManager[BlockManager].
readBroadcastBlock requests the BlockManager for storage:BlockManager.md#getLocalValues[getLocalValues].
If the broadcast data was available locally, readBroadcastBlock <
If however the broadcast data was not found locally, you should see the following INFO message in the logs:
Started reading broadcast variable [id]¶
You should see the following INFO message in the logs:
Reading broadcast variable [id] took [usedTimeMs]¶
NOTE: readBroadcastBlock uses the core:SparkEnv.md#serializer[current
Serializer] and the internal io:CompressionCodec.md[CompressionCodec] to bring all the blocks together as one single broadcast variable.
readBroadcastBlock storage:BlockManager.md#putSingle[stores the broadcast variable with
MEMORY_AND_DISK storage level to the local
BlockManager]. When storing the broadcast variable was unsuccessful, a
SparkException is thrown.
Failed to store [broadcastId] in BlockManager¶
The broadcast variable is returned.
readBroadcastBlock is used when TorrentBroadcast is requested for the <<_value, broadcast value>>.
== [[setConf]] setConf Internal Method
setConf( conf: SparkConf): Unit
setConf uses the input
conf SparkConf.md[SparkConf] to set compression codec and the block size.
Internally, setConf reads core:BroadcastManager.md#spark.broadcast.compress[spark.broadcast.compress] configuration property and if enabled (which it is by default) sets a io:CompressionCodec.md#createCodec[CompressionCodec] (as an internal
setConf also reads core:BroadcastManager.md#spark_broadcast_blockSize[spark.broadcast.blockSize] Spark property and sets the block size (as the internal
setConf is executed when <
== [[writeBlocks]] Storing Broadcast and Blocks to BlockManager
writeBlocks( value: T): Int
writeBlocks stores the given value (that is the <
Internally, writeBlocks uses the core:SparkEnv.md#get[SparkEnv] to access core:SparkEnv.md#blockManager[BlockManager].
writeBlocks requests the BlockManager to storage:BlockManager.md#putSingle[putSingle] (with MEMORY_AND_DISK storage level).
For every block, writeBlocks creates a storage:BlockId.md#BroadcastBlockId[BroadcastBlockId] for the <
piece[index] identifier, and requests the BlockManager to storage:BlockManager.md#putBytes[putBytes] (with MEMORY_AND_DISK_SER storage level).
The entire broadcast value is stored in the local BlockManager with MEMORY_AND_DISK storage level whereas the blocks with MEMORY_AND_DISK_SER storage level.
In case of an error while storing the value or the blocks, writeBlocks throws a SparkException:
Failed to store [pieceId] of [broadcastId] in local BlockManager¶
writeBlocks is used when TorrentBroadcast is <
== [[blockifyObject]] Chunking Broadcast Variable Into Blocks
blockifyObject divides (aka blockifies) the input
obj value into blocks (
ByteBuffer chunks). blockifyObject uses the given serializer:Serializer.md to write the value in a serialized format to a
ChunkedByteBufferOutputStream of the given
blockSize size with the optional io:CompressionCodec.md[CompressionCodec].
blockifyObject is used when TorrentBroadcast is requested to <
doUnpersist(blocking: Boolean): Unit¶
doUnpersist is part of the Broadcast.md#contract[
Broadcast Variable Contract] and is executed from <
doDestroy(blocking: Boolean): Unit¶
doDestroy is executed when Broadcast.md#destroy-internal[
Broadcast removes the persisted data and metadata related to a broadcast variable].
== [[unpersist]] unpersist Utility
unpersist( id: Long, removeFromDriver: Boolean, blocking: Boolean): Unit
unpersist removes all broadcast blocks from executors and, with the given removeFromDriver flag enabled, from the driver.
When executed, unpersist prints out the following DEBUG message in the logs:
Unpersisting TorrentBroadcast [id]¶
unpersist requests storage:BlockManagerMaster.md#removeBroadcast[
BlockManagerMaster to remove the
NOTE: unpersist uses core:SparkEnv.md#blockManager[
SparkEnv to get the
unpersist is used when:
TorrentBroadcast is requested to <
> and < >
TorrentBroadcastFactory is requested to TorrentBroadcastFactory.md#unbroadcast[unbroadcast]
== [[readBlocks]] Reading Broadcast Blocks
readBlocks creates a local array of storage:BlockData.mds for <
readBlocks uses the core:SparkEnv.md to access core:SparkEnv.md#blockManager[BlockManager] (that is later used to fetch local or remote blocks).
For every block (randomly-chosen by block ID between 0 and <
piece prefix followed by the ID.
readBlocks prints out the following DEBUG message to the logs:
Reading piece [pieceId] of [broadcastId]¶
readBlocks first tries to look up the piece locally by requesting the BlockManager to storage:BlockManager.md#getLocalBytes[getLocalBytes] and, if found, stores the reference in the local block array (for the piece ID) and <
If not found locally, readBlocks requests the BlockManager to storage:BlockManager.md#getRemoteBytes[getRemoteBytes].
readBlocks throws a SparkException for blocks neither available locally nor remotely:
Failed to get [pieceId] of [broadcastId]¶
readBlocks is used when TorrentBroadcast is requested to <
== [[unBlockifyObject]] unBlockifyObject Utility
unBlockifyObjectT: ClassTag: T
unBlockifyObject is used when TorrentBroadcast is requested to <
== [[releaseLock]] releaseLock Internal Method
releaseLock( blockId: BlockId): Unit
releaseLock is used when TorrentBroadcast is requested to <
== [[logging]] Logging
ALL logging level for
org.apache.spark.broadcast.TorrentBroadcast logger to see what happens inside.
Add the following line to
Refer to spark-logging.md[Logging].