Class: Spark::Broadcast

Inherits:
Object
  • Object
show all
Defined in:
lib/spark/broadcast.rb

Overview

Broadcast a read-only variable to the cluster, returning a Spark::Broadcast object for reading it in distributed functions. The variable will be sent to each cluster only once.

Example:

broadcast1 = $sc.broadcast('a')
broadcast2 = $sc.broadcast('b')
broadcast3 = $sc.broadcast([1,2,3])

func = Proc.new do |part, index|
  [
    broadcast1.value * index,
    broadcast2.value * index,
    broadcast3.value.reduce(:+)
  ]
end

rdd = $sc.parallelize(0..5, 4)
rdd = rdd.bind(broadcast1: broadcast1, broadcast2: broadcast2, broadcast3: broadcast3)
rdd = rdd.map_partitions_with_index(func)
rdd.collect
# => ["", "", 6, "a", "b", 6, "aa", "bb", 6, "aaa", "bbb", 6]

Constant Summary collapse

LOADED =

id, value, path

0
NOT_LOADED =

id, path

1
WITHOUT_PATH =

id

2
@@registered =
{}

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(sc, value) ⇒ Broadcast

Create new Broadcast and dump value to the disk

b = $sc.broadcast('a')

b.value # => 'a'
b.path
b.jbroadcast


48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/spark/broadcast.rb', line 48

def initialize(sc, value)
  @id = object_id
  @value = value
  @state = LOADED

  file = Tempfile.create('broadcast', sc.temp_dir)
  file.binmode
  file.write(Marshal.dump(value))
  file.close

  @path = file.path
  @jbroadcast = RubyRDD.readBroadcastFromFile(sc.jcontext, @path, Spark.jb.to_long(@id))

  ObjectSpace.define_finalizer(self, proc { File.unlink(@path) })
end

Instance Attribute Details

#idObject (readonly)

Returns the value of attribute id.



33
34
35
# File 'lib/spark/broadcast.rb', line 33

def id
  @id
end

#jbroadcastObject (readonly)

Returns the value of attribute jbroadcast.



33
34
35
# File 'lib/spark/broadcast.rb', line 33

def jbroadcast
  @jbroadcast
end

#pathObject (readonly)

Returns the value of attribute path.



33
34
35
# File 'lib/spark/broadcast.rb', line 33

def path
  @path
end

#stateObject (readonly)

Returns the value of attribute state.



33
34
35
# File 'lib/spark/broadcast.rb', line 33

def state
  @state
end

Class Method Details

.register(id, path) ⇒ Object



71
72
73
# File 'lib/spark/broadcast.rb', line 71

def self.register(id, path)
  @@registered[id] = path
end

Instance Method Details

#inspectObject



64
65
66
67
68
69
# File 'lib/spark/broadcast.rb', line 64

def inspect
  result  = %{#<#{self.class.name}:0x#{object_id}\n}
  result << %{   ID: #{@id}\n}
  result << %{Value: #{@value.to_s[0, 10]}>}
  result
end

#marshal_dumpObject



95
96
97
# File 'lib/spark/broadcast.rb', line 95

def marshal_dump
  @id
end

#marshal_load(id) ⇒ Object



99
100
101
102
# File 'lib/spark/broadcast.rb', line 99

def marshal_load(id)
  @id = id
  @state = WITHOUT_PATH
end

#valueObject



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/spark/broadcast.rb', line 75

def value
  case state
  when LOADED
    @value
  when NOT_LOADED
    @value = Marshal.load(File.read(@path))
    @state = LOADED
    @value
  when WITHOUT_PATH
    @path = @@registered[id]

    if @path
      @state = NOT_LOADED
      value
    else
      raise Spark::BroadcastError, "Broadcast #{@id} do not have registered path."
    end
  end
end