Class: Spark::Accumulator
- Inherits:
-
Object
- Object
- Spark::Accumulator
- Defined in:
- lib/spark/accumulator.rb,
lib/spark/accumulator.rb
Overview
A shared variable that can be accumulated, i.e., has a commutative and associative “add” operation. Worker tasks on a Spark cluster can add values to an Accumulator with the ‘+=` operator, but only the driver program is allowed to access its value, using value. Updates from the workers get propagated automatically to the driver program.
Arguments:
- value
-
Initial value for accumulator. This values is stored only on driver process
- accum_param
-
How merge 2 value on worker or driver process. Symbol or Proc (or String)
- zero_value
-
Initial value for worker process
Examples:
accum1 = $sc.accumulator(1)
accum2 = $sc.accumulator(2, :*, 1)
accum3 = $sc.accumulator(3, lambda{|max, val| val > max ? val : max})
accum1 += 1
accum2.add(2)
accum2.add(2)
accum2.add(2)
accum3.add(9)
accum3.add(6)
accum3.add(7)
accum1.value # => 2
accum2.value # => 16
accum3.value # => 9
func = Proc.new do |_, index|
accum1.add(1)
accum2.add(2)
accum3.add(index * 10)
end
rdd = $sc.parallelize(0..4, 4)
rdd = rdd.bind(accum1: accum1, accum2: accum2, accum3: accum3)
rdd = rdd.map_partitions_with_index(func)
rdd.collect
accum1.value # => 6
accum2.value # => 256
accum3.value # => 30
Defined Under Namespace
Classes: Server
Constant Summary collapse
- SUPPORTED_SYMBOLS =
[:+, :-, :*, :/, :**]
- @@instances =
{}
- @@changed =
[]
Instance Attribute Summary collapse
-
#accum_param ⇒ Object
readonly
Returns the value of attribute accum_param.
-
#id ⇒ Object
readonly
Returns the value of attribute id.
-
#value ⇒ Object
readonly
Returns the value of attribute value.
-
#zero_value ⇒ Object
readonly
Returns the value of attribute zero_value.
Class Method Summary collapse
Instance Method Summary collapse
- #+(term) ⇒ Object
-
#add(term) ⇒ Object
Operations.
- #add_by_symbol(term) ⇒ Object
-
#driver? ⇒ Boolean
Driver process or worker.
-
#initialize(value, accum_param = :+, zero_value = 0) ⇒ Accumulator
constructor
Creating and selecting Spark::Accumulator.
- #inspect ⇒ Object
- #load_accum_param ⇒ Object
-
#marshal_dump ⇒ Object
Dump and load.
- #marshal_load(array) ⇒ Object
- #valid_accum_param ⇒ Object
Constructor Details
#initialize(value, accum_param = :+, zero_value = 0) ⇒ Accumulator
Creating and selecting Spark::Accumulator
68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/spark/accumulator.rb', line 68 def initialize(value, accum_param=:+, zero_value=0) @id = object_id @value = value @accum_param = accum_param @zero_value = zero_value @driver = true valid_accum_param @@instances[@id] = self end |
Instance Attribute Details
#accum_param ⇒ Object (readonly)
Returns the value of attribute accum_param.
57 58 59 |
# File 'lib/spark/accumulator.rb', line 57 def accum_param @accum_param end |
#id ⇒ Object (readonly)
Returns the value of attribute id.
57 58 59 |
# File 'lib/spark/accumulator.rb', line 57 def id @id end |
#value ⇒ Object (readonly)
Returns the value of attribute value.
57 58 59 |
# File 'lib/spark/accumulator.rb', line 57 def value @value end |
#zero_value ⇒ Object (readonly)
Returns the value of attribute zero_value.
57 58 59 |
# File 'lib/spark/accumulator.rb', line 57 def zero_value @zero_value end |
Class Method Details
.changed ⇒ Object
88 89 90 |
# File 'lib/spark/accumulator.rb', line 88 def self.changed @@changed end |
.instances ⇒ Object
92 93 94 |
# File 'lib/spark/accumulator.rb', line 92 def self.instances @@instances end |
Instance Method Details
#+(term) ⇒ Object
147 148 149 150 |
# File 'lib/spark/accumulator.rb', line 147 def +(term) add(term) self end |
#add(term) ⇒ Object
Operations
135 136 137 138 139 140 141 142 143 144 145 |
# File 'lib/spark/accumulator.rb', line 135 def add(term) if !driver? && !@@changed.include?(self) @@changed << self end if @accum_param.is_a?(Proc) @value = @accum_param.call(@value, term) else add_by_symbol(term) end end |
#add_by_symbol(term) ⇒ Object
152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
# File 'lib/spark/accumulator.rb', line 152 def add_by_symbol(term) case @accum_param when :+ @value += term when :- @value -= term when :* @value *= term when :/ @value /= term when :** @value **= term end end |
#driver? ⇒ Boolean
Driver process or worker
127 128 129 |
# File 'lib/spark/accumulator.rb', line 127 def driver? @driver end |
#inspect ⇒ Object
80 81 82 83 84 85 86 |
# File 'lib/spark/accumulator.rb', line 80 def inspect result = %{#<#{self.class.name}:0x#{object_id}\n} result << %{ ID: #{@id}\n} result << %{ Zero: #{@zero_value.to_s[0, 10]}\n} result << %{Value: #{@value.to_s[0, 10]}>} result end |
#load_accum_param ⇒ Object
183 184 185 186 187 188 189 |
# File 'lib/spark/accumulator.rb', line 183 def load_accum_param if @serialized_accum_param.is_a?(String) @accum_param = eval(@serialized_accum_param) else @accum_param = @serialized_accum_param end end |
#marshal_dump ⇒ Object
Dump and load
171 172 173 |
# File 'lib/spark/accumulator.rb', line 171 def marshal_dump [@id, @zero_value, @serialized_accum_param] end |
#marshal_load(array) ⇒ Object
175 176 177 178 179 180 181 |
# File 'lib/spark/accumulator.rb', line 175 def marshal_load(array) @id, @zero_value, @serialized_accum_param = array @value = @zero_value @driver = false load_accum_param end |
#valid_accum_param ⇒ Object
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/spark/accumulator.rb', line 96 def valid_accum_param if @accum_param.is_a?(Symbol) raise Spark::AccumulatorError, "Unsupported symbol #{@accum_param}" unless SUPPORTED_SYMBOLS.include?(@accum_param) @serialized_accum_param = @accum_param return end if @accum_param.is_a?(Proc) begin @serialized_accum_param = @accum_param.to_source return rescue raise Spark::SerializeError, 'Proc can not be serialized. Use String instead.' end end if @accum_param.is_a?(String) @serialized_accum_param = @accum_param @accum_param = eval(@accum_param) unless @accum_param.is_a?(Proc) raise Spark::SerializeError, 'Yours param is not a Proc.' end return end raise Spark::AccumulatorError, 'Unsupported param. Use Symbol, Proc or String.' end |