TorrentBroadcast¶
TorrentBroadcast
is a Broadcast that uses a BitTorrent-like protocol for broadcast blocks distribution.
Creating Instance¶
TorrentBroadcast
takes the following to be created:
- Broadcast Value (of type
T
) - Identifier
TorrentBroadcast
is created when:
TorrentBroadcastFactory
is requested for a new broadcast variable
BroadcastBlockId¶
TorrentBroadcast
creates a BroadcastBlockId (with the id) when created
Number of Block Chunks¶
TorrentBroadcast
uses numBlocks
for the number of blocks of a broadcast variable (that was blockified into when created).
Transient Lazy Broadcast Value¶
_value: T
TorrentBroadcast
uses _value
transient registry for the value that is computed on demand (and cached afterwards).
_value
is a @transient private lazy val
and uses the following Scala language features:
- It is not serialized when the
TorrentBroadcast
is serialized to be sent over the wire to executors (and has to be re-computed afterwards) - It is lazily instantiated when first requested and cached afterwards
Value¶
getValue(): T
getValue
uses the _value transient registry for the value if available (non-null
).
Otherwise, getValue
reads the broadcast block (from the local BroadcastManager, BlockManager or falls back to readBlocks).
getValue
saves the object in the _value registry.
getValue
is part of the Broadcast abstraction.
Reading Broadcast Block¶
readBroadcastBlock(): T
readBroadcastBlock
looks up the BroadcastBlockId in (the cache of) BroadcastManager and returns the value if found.
Otherwise, readBroadcastBlock
setConf and requests the BlockManager for the locally-stored broadcast data.
If the broadcast block is found locally, readBroadcastBlock
requests the BroadcastManager
to cache it and returns the value.
If not found locally, readBroadcastBlock
multiplies the numBlocks by the blockSize for an estimated size of the broadcast block. readBroadcastBlock
prints out the following INFO message to the logs:
Started reading broadcast variable [id] with [numBlocks] pieces
(estimated total size [estimatedTotalSize])
readBroadcastBlock
readBlocks and prints out the following INFO message to the logs:
Reading broadcast variable [id] took [time] ms
readBroadcastBlock
unblockifies the block chunks into an object (using the Serializer and the CompressionCodec).
readBroadcastBlock
requests the BlockManager to store the merged copy (so other tasks on this executor don't need to re-fetch it). readBroadcastBlock
uses MEMORY_AND_DISK
storage level and the tellMaster
flag off.
readBroadcastBlock
requests the BroadcastManager
to cache it and returns the value.
Unblockifying Broadcast Value¶
unBlockifyObject(
blocks: Array[InputStream],
serializer: Serializer,
compressionCodec: Option[CompressionCodec]): T
unBlockifyObject
...FIXME
Reading Broadcast Block Chunks¶
readBlocks(): Array[BlockData]
readBlocks
creates a collection of BlockDatas for numBlocks block chunks.
For every block (randomly-chosen by block ID between 0 and numBlocks), readBlocks
creates a BroadcastBlockId for the id (of the broadcast variable) and the chunk (identified by the piece
prefix followed by the ID).
readBlocks
prints out the following DEBUG message to the logs:
Reading piece [pieceId] of [broadcastId]
readBlocks
first tries to look up the piece locally by requesting the BlockManager
to getLocalBytes and, if found, stores the reference in the local block array (for the piece ID).
If not found in the local BlockManager
, readBlocks
requests the BlockManager
to getRemoteBytes.
With checksumEnabled, readBlocks
...FIXME
readBlocks
requests the BlockManager
to store the chunk (so other tasks on this executor don't need to re-fetch it) using MEMORY_AND_DISK_SER
storage level and reporting to the driver (so other executors can pull these chunks from this executor as well).
readBlocks
creates a ByteBufferBlockData for the chunk (and stores it in the blocks
array).
readBlocks
throws a SparkException
for blocks neither available locally nor remotely:
Failed to get [pieceId] of [broadcastId]
CompressionCodec¶
compressionCodec: Option[CompressionCodec]
TorrentBroadcast
uses the spark.broadcast.compress configuration property for the CompressionCodec
to use for writeBlocks and readBroadcastBlock.
Broadcast Block Chunk Size¶
TorrentBroadcast
uses the spark.broadcast.blockSize configuration property for the size of the chunks (pieces) of a broadcast block.
TorrentBroadcast
uses the size for writeBlocks and readBroadcastBlock.
Persisting Broadcast (to BlockManager)¶
writeBlocks(
value: T): Int
writeBlocks
returns the number of blocks (chunks) this broadcast variable (was blockified into).
The whole broadcast value is stored in the local BlockManager
with MEMORY_AND_DISK
storage level while the block chunks with MEMORY_AND_DISK_SER
storage level.
writeBlocks
is used when:
TorrentBroadcast
is created (that happens on the driver only)
writeBlocks
requests the BlockManager to store the given broadcast value (to be identified as the broadcastId and with the MEMORY_AND_DISK
storage level).
writeBlocks
blockify the object (into chunks of the block size, the Serializer, and the optional compressionCodec).
With checksumEnabled writeBlocks
...FIXME
For every block, writeBlocks
creates a BroadcastBlockId for the id and piece[index]
identifier, and requests the BlockManager
to store the chunk bytes (with MEMORY_AND_DISK_SER
storage level and reporting to the driver).
Blockifying Broadcast Variable¶
blockifyObject(
obj: T,
blockSize: Int,
serializer: Serializer,
compressionCodec: Option[CompressionCodec]): Array[ByteBuffer]
blockifyObject
divides (blockifies) the input obj
broadcast value into blocks (ByteBuffer
chunks). blockifyObject
uses the given Serializer to write the value in a serialized format to a ChunkedByteBufferOutputStream
of the given blockSize
size with the optional CompressionCodec
.
Error Handling¶
In case of any error, writeBlocks
prints out the following ERROR message to the logs and requests the local BlockManager
to remove the broadcast.
Store broadcast [broadcastId] fail, remove all pieces of the broadcast
In case of an error while storing the value itself, writeBlocks
throws a SparkException
:
Failed to store [broadcastId] in BlockManager
In case of an error while storing the chunks of the blockified value, writeBlocks
throws a SparkException
:
Failed to store [pieceId] of [broadcastId] in local BlockManager
Destroying Variable¶
doDestroy(
blocking: Boolean): Unit
doDestroy
removes the persisted state (associated with the broadcast variable) on all the nodes in a Spark application (the driver and executors).
doDestroy
is part of the Broadcast abstraction.
Unpersisting Variable¶
doUnpersist(
blocking: Boolean): Unit
doUnpersist
removes the persisted state (associated with the broadcast variable) on executors only.
doUnpersist
is part of the Broadcast abstraction.
Removing Persisted State (Broadcast Blocks) of Broadcast Variable¶
unpersist(
id: Long,
removeFromDriver: Boolean,
blocking: Boolean): Unit
unpersist
prints out the following DEBUG message to the logs:
Unpersisting TorrentBroadcast [id]
In the end, unpersist
requests the BlockManagerMaster to remove the blocks of the given broadcast.
unpersist
is used when:
TorrentBroadcast
is requested to unpersist and destroyTorrentBroadcastFactory
is requested to unbroadcast
setConf¶
setConf(
conf: SparkConf): Unit
setConf
uses the given SparkConf to initialize the compressionCodec, the blockSize and the checksumEnabled.
setConf
is used when:
TorrentBroadcast
is created and re-created (when deserialized on executors)
Logging¶
Enable ALL
logging level for org.apache.spark.broadcast.TorrentBroadcast
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.broadcast.TorrentBroadcast=ALL
Refer to Logging.