TorrentBroadcast
= TorrentBroadcast
TorrentBroadcast is a Broadcast.md[] that uses a BitTorrent-like protocol for broadcast blocks distribution.
.TorrentBroadcast -- Broadcasting using BitTorrent image::sparkcontext-broadcast-bittorrent.png[align="center"]
When a SparkContext.md#broadcast[broadcast variable is created (using SparkContext.broadcast
)] on the driver, a <
[source, scala]¶
// On the driver val sc: SparkContext = ??? val anyScalaValue = ??? val b = sc.broadcast(anyScalaValue) // ← TorrentBroadcast is created
A broadcast variable is stored on the driver's storage:BlockManager.md[BlockManager] as a single value and separately as broadcast blocks (after it was <
.TorrentBroadcast puts broadcast and the chunks to driver's BlockManager image::sparkcontext-broadcast-bittorrent-newBroadcast.png[align="center"]
NOTE: TorrentBroadcast-based broadcast variables are created using core:TorrentBroadcastFactory.md[TorrentBroadcastFactory].
== [[creating-instance]] Creating Instance
TorrentBroadcast takes the following to be created:
- [[obj]] Object (the value) to be broadcast
- [[id]] ID
TorrentBroadcast is created when TorrentBroadcastFactory is requested for a core:TorrentBroadcastFactory.md#newBroadcast[new broadcast variable].
== [[_value]] Transient Lazy Broadcast Value
[source, scala]¶
_value: T¶
TorrentBroadcast uses _value
internal registry for the value that is <
_value
is a @transient private lazy val
and uses two Scala language features:
-
It is not serialized when the TorrentBroadcast is serialized to be sent over the wire to executors (and has to be <
> afterwards) -
It is lazily instantiated when first requested and cached afterwards
== [[numBlocks]] numBlocks Internal Value
TorrentBroadcast uses numBlocks
internal value for the total number of blocks it contains. It is <
== [[getValue]] Getting Value of Broadcast Variable
[source, scala]¶
def getValue(): T¶
getValue returns the <<_value, _value>>.
getValue is part of the Broadcast.md#getValue[Broadcast] abstraction.
== [[broadcastId]] BroadcastBlockId
TorrentBroadcast uses a storage:BlockId.md#BroadcastBlockId[BroadcastBlockId] for...FIXME
== [[readBroadcastBlock]] readBroadcastBlock Internal Method
[source, scala]¶
readBroadcastBlock(): T¶
readBroadcastBlock SparkEnv.md#get[uses the SparkEnv] to access SparkEnv.md#broadcastManager[BroadcastManager] that is requested for BroadcastManager.md#cachedValues[cached broadcast values].
readBroadcastBlock looks up the <
If not found, readBroadcastBlock requests the SparkEnv for the core:SparkEnv.md#conf[SparkConf] and <
readBroadcastBlock SparkEnv.md#get[uses the SparkEnv] to access SparkEnv.md#blockManager[BlockManager].
readBroadcastBlock requests the BlockManager for storage:BlockManager.md#getLocalValues[getLocalValues].
If the broadcast data was available locally, readBroadcastBlock <
If however the broadcast data was not found locally, you should see the following INFO message in the logs:
[source,plaintext]¶
Started reading broadcast variable [id]¶
readBroadcastBlock <
You should see the following INFO message in the logs:
[source,plaintext]¶
Reading broadcast variable [id] took [usedTimeMs]¶
readBroadcastBlock <
NOTE: readBroadcastBlock uses the core:SparkEnv.md#serializer[current Serializer
] and the internal io:CompressionCodec.md[CompressionCodec] to bring all the blocks together as one single broadcast variable.
readBroadcastBlock storage:BlockManager.md#putSingle[stores the broadcast variable with MEMORY_AND_DISK
storage level to the local BlockManager
]. When storing the broadcast variable was unsuccessful, a SparkException
is thrown.
[source,plaintext]¶
Failed to store [broadcastId] in BlockManager¶
The broadcast variable is returned.
readBroadcastBlock is used when TorrentBroadcast is requested for the <<_value, broadcast value>>.
== [[setConf]] setConf Internal Method
[source, scala]¶
setConf( conf: SparkConf): Unit
setConf uses the input conf
SparkConf.md[SparkConf] to set compression codec and the block size.
Internally, setConf reads core:BroadcastManager.md#spark.broadcast.compress[spark.broadcast.compress] configuration property and if enabled (which it is by default) sets a io:CompressionCodec.md#createCodec[CompressionCodec] (as an internal compressionCodec
property).
setConf also reads core:BroadcastManager.md#spark_broadcast_blockSize[spark.broadcast.blockSize] Spark property and sets the block size (as the internal blockSize
property).
setConf is executed when <
== [[writeBlocks]] Storing Broadcast and Blocks to BlockManager
[source, scala]¶
writeBlocks( value: T): Int
writeBlocks stores the given value (that is the <
Internally, writeBlocks uses the core:SparkEnv.md#get[SparkEnv] to access core:SparkEnv.md#blockManager[BlockManager].
writeBlocks requests the BlockManager to storage:BlockManager.md#putSingle[putSingle] (with MEMORY_AND_DISK storage level).
writeBlocks <
For every block, writeBlocks creates a storage:BlockId.md#BroadcastBlockId[BroadcastBlockId] for the <piece[index]
identifier, and requests the BlockManager to storage:BlockManager.md#putBytes[putBytes] (with MEMORY_AND_DISK_SER storage level).
The entire broadcast value is stored in the local BlockManager with MEMORY_AND_DISK storage level whereas the blocks with MEMORY_AND_DISK_SER storage level.
With <
In case of an error while storing the value or the blocks, writeBlocks throws a SparkException:
[source,plaintext]¶
Failed to store [pieceId] of [broadcastId] in local BlockManager¶
writeBlocks is used when TorrentBroadcast is <
== [[blockifyObject]] Chunking Broadcast Variable Into Blocks
[source, scala]¶
blockifyObjectT: Array[ByteBuffer]
blockifyObject divides (aka blockifies) the input obj
value into blocks (ByteBuffer
chunks). blockifyObject uses the given serializer:Serializer.md[] to write the value in a serialized format to a ChunkedByteBufferOutputStream
of the given blockSize
size with the optional io:CompressionCodec.md[CompressionCodec].
blockifyObject is used when TorrentBroadcast is requested to <
== [[doUnpersist]] doUnpersist
Method
[source, scala]¶
doUnpersist(blocking: Boolean): Unit¶
doUnpersist
<
NOTE: doUnpersist
is part of the Broadcast.md#contract[Broadcast
Variable Contract] and is executed from <
== [[doDestroy]] doDestroy
Method
[source, scala]¶
doDestroy(blocking: Boolean): Unit¶
doDestroy
<
NOTE: doDestroy
is executed when Broadcast.md#destroy-internal[Broadcast
removes the persisted data and metadata related to a broadcast variable].
== [[unpersist]] unpersist Utility
[source, scala]¶
unpersist( id: Long, removeFromDriver: Boolean, blocking: Boolean): Unit
unpersist removes all broadcast blocks from executors and, with the given removeFromDriver flag enabled, from the driver.
When executed, unpersist prints out the following DEBUG message in the logs:
[source,plaintext]¶
Unpersisting TorrentBroadcast [id]¶
unpersist requests storage:BlockManagerMaster.md#removeBroadcast[BlockManagerMaster
to remove the id
broadcast].
NOTE: unpersist uses core:SparkEnv.md#blockManager[SparkEnv
to get the BlockManagerMaster
] (through blockManager
property).
unpersist is used when:
-
TorrentBroadcast is requested to <
> and < > -
TorrentBroadcastFactory is requested to TorrentBroadcastFactory.md#unbroadcast[unbroadcast]
== [[readBlocks]] Reading Broadcast Blocks
[source, scala]¶
readBlocks(): Array[BlockData]¶
readBlocks creates a local array of storage:BlockData.md[]s for <
readBlocks uses the core:SparkEnv.md[] to access core:SparkEnv.md#blockManager[BlockManager] (that is later used to fetch local or remote blocks).
For every block (randomly-chosen by block ID between 0 and <piece
prefix followed by the ID.
readBlocks prints out the following DEBUG message to the logs:
[source,plaintext]¶
Reading piece [pieceId] of [broadcastId]¶
readBlocks first tries to look up the piece locally by requesting the BlockManager to storage:BlockManager.md#getLocalBytes[getLocalBytes] and, if found, stores the reference in the local block array (for the piece ID) and <
If not found locally, readBlocks requests the BlockManager to storage:BlockManager.md#getRemoteBytes[getRemoteBytes].
readBlocks...FIXME
readBlocks throws a SparkException for blocks neither available locally nor remotely:
[source,plaintext]¶
Failed to get [pieceId] of [broadcastId]¶
readBlocks is used when TorrentBroadcast is requested to <
== [[unBlockifyObject]] unBlockifyObject Utility
[source, scala]¶
unBlockifyObjectT: ClassTag: T
unBlockifyObject...FIXME
unBlockifyObject is used when TorrentBroadcast is requested to <
== [[releaseLock]] releaseLock Internal Method
[source, scala]¶
releaseLock( blockId: BlockId): Unit
releaseLock...FIXME
releaseLock is used when TorrentBroadcast is requested to <
== [[logging]] Logging
Enable ALL
logging level for org.apache.spark.broadcast.TorrentBroadcast
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
[source]¶
log4j.logger.org.apache.spark.broadcast.TorrentBroadcast=ALL¶
Refer to spark-logging.md[Logging].