Class: Spark::Broadcast
- Inherits:
-
Object
- Object
- Spark::Broadcast
- Defined in:
- lib/spark/broadcast.rb
Overview
Broadcast a read-only variable to the cluster, returning a Spark::Broadcast object for reading it in distributed functions. The variable will be sent to each cluster only once.
Example:
broadcast1 = $sc.broadcast('a')
broadcast2 = $sc.broadcast('b')
broadcast3 = $sc.broadcast([1,2,3])
func = Proc.new do |part, index|
[
broadcast1.value * index,
broadcast2.value * index,
broadcast3.value.reduce(:+)
]
end
rdd = $sc.parallelize(0..5, 4)
rdd = rdd.bind(broadcast1: broadcast1, broadcast2: broadcast2, broadcast3: broadcast3)
rdd = rdd.map_partitions_with_index(func)
rdd.collect
# => ["", "", 6, "a", "b", 6, "aa", "bb", 6, "aaa", "bbb", 6]
Constant Summary collapse
- LOADED =
id, value, path
0
- NOT_LOADED =
id, path
1
- WITHOUT_PATH =
id
2
- @@registered =
{}
Instance Attribute Summary collapse
-
#id ⇒ Object
readonly
Returns the value of attribute id.
-
#jbroadcast ⇒ Object
readonly
Returns the value of attribute jbroadcast.
-
#path ⇒ Object
readonly
Returns the value of attribute path.
-
#state ⇒ Object
readonly
Returns the value of attribute state.
Class Method Summary collapse
Instance Method Summary collapse
-
#initialize(sc, value) ⇒ Broadcast
constructor
Create new Broadcast and dump value to the disk.
- #inspect ⇒ Object
- #marshal_dump ⇒ Object
- #marshal_load(id) ⇒ Object
- #value ⇒ Object
Constructor Details
#initialize(sc, value) ⇒ Broadcast
Create new Broadcast and dump value to the disk
b = $sc.broadcast('a')
b.value # => 'a'
b.path
b.jbroadcast
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/spark/broadcast.rb', line 48 def initialize(sc, value) @id = object_id @value = value @state = LOADED file = Tempfile.create('broadcast', sc.temp_dir) file.binmode file.write(Marshal.dump(value)) file.close @path = file.path @jbroadcast = RubyRDD.readBroadcastFromFile(sc.jcontext, @path, Spark.jb.to_long(@id)) ObjectSpace.define_finalizer(self, proc { File.unlink(@path) }) end |
Instance Attribute Details
#id ⇒ Object (readonly)
Returns the value of attribute id.
33 34 35 |
# File 'lib/spark/broadcast.rb', line 33 def id @id end |
#jbroadcast ⇒ Object (readonly)
Returns the value of attribute jbroadcast.
33 34 35 |
# File 'lib/spark/broadcast.rb', line 33 def jbroadcast @jbroadcast end |
#path ⇒ Object (readonly)
Returns the value of attribute path.
33 34 35 |
# File 'lib/spark/broadcast.rb', line 33 def path @path end |
#state ⇒ Object (readonly)
Returns the value of attribute state.
33 34 35 |
# File 'lib/spark/broadcast.rb', line 33 def state @state end |
Class Method Details
.register(id, path) ⇒ Object
71 72 73 |
# File 'lib/spark/broadcast.rb', line 71 def self.register(id, path) @@registered[id] = path end |
Instance Method Details
#inspect ⇒ Object
64 65 66 67 68 69 |
# File 'lib/spark/broadcast.rb', line 64 def inspect result = %{#<#{self.class.name}:0x#{object_id}\n} result << %{ ID: #{@id}\n} result << %{Value: #{@value.to_s[0, 10]}>} result end |
#marshal_dump ⇒ Object
95 96 97 |
# File 'lib/spark/broadcast.rb', line 95 def marshal_dump @id end |
#marshal_load(id) ⇒ Object
99 100 101 102 |
# File 'lib/spark/broadcast.rb', line 99 def marshal_load(id) @id = id @state = WITHOUT_PATH end |
#value ⇒ Object
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/spark/broadcast.rb', line 75 def value case state when LOADED @value when NOT_LOADED @value = Marshal.load(File.read(@path)) @state = LOADED @value when WITHOUT_PATH @path = @@registered[id] if @path @state = NOT_LOADED value else raise Spark::BroadcastError, "Broadcast #{@id} do not have registered path." end end end |