Class: Spark::Accumulator

Inherits:
Object
  • Object
show all
Defined in:
lib/spark/accumulator.rb,
lib/spark/accumulator.rb

Overview

A shared variable that can be accumulated, i.e., has a commutative and associative “add” operation. Worker tasks on a Spark cluster can add values to an Accumulator with the ‘+=` operator, but only the driver program is allowed to access its value, using value. Updates from the workers get propagated automatically to the driver program.

Arguments:

value

Initial value for accumulator. This values is stored only on driver process

accum_param

How merge 2 value on worker or driver process. Symbol or Proc (or String)

zero_value

Initial value for worker process

Examples:

accum1 = $sc.accumulator(1)
accum2 = $sc.accumulator(2, :*, 1)
accum3 = $sc.accumulator(3, lambda{|max, val| val > max ? val : max})

accum1 += 1

accum2.add(2)
accum2.add(2)
accum2.add(2)

accum3.add(9)
accum3.add(6)
accum3.add(7)

accum1.value # => 2
accum2.value # => 16
accum3.value # => 9

func = Proc.new do |_, index|
  accum1.add(1)
  accum2.add(2)
  accum3.add(index * 10)
end

rdd = $sc.parallelize(0..4, 4)
rdd = rdd.bind(accum1: accum1, accum2: accum2, accum3: accum3)
rdd = rdd.map_partitions_with_index(func)
rdd.collect

accum1.value # => 6
accum2.value # => 256
accum3.value # => 30

Defined Under Namespace

Classes: Server

Constant Summary collapse

SUPPORTED_SYMBOLS =
[:+, :-, :*, :/, :**]
@@instances =
{}
@@changed =
[]

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(value, accum_param = :+, zero_value = 0) ⇒ Accumulator

Creating and selecting Spark::Accumulator



68
69
70
71
72
73
74
75
76
77
78
# File 'lib/spark/accumulator.rb', line 68

def initialize(value, accum_param=:+, zero_value=0)
  @id = object_id
  @value = value
  @accum_param = accum_param
  @zero_value = zero_value
  @driver = true

  valid_accum_param

  @@instances[@id] = self
end

Instance Attribute Details

#accum_paramObject (readonly)

Returns the value of attribute accum_param.



57
58
59
# File 'lib/spark/accumulator.rb', line 57

def accum_param
  @accum_param
end

#idObject (readonly)

Returns the value of attribute id.



57
58
59
# File 'lib/spark/accumulator.rb', line 57

def id
  @id
end

#valueObject (readonly)

Returns the value of attribute value.



57
58
59
# File 'lib/spark/accumulator.rb', line 57

def value
  @value
end

#zero_valueObject (readonly)

Returns the value of attribute zero_value.



57
58
59
# File 'lib/spark/accumulator.rb', line 57

def zero_value
  @zero_value
end

Class Method Details

.changedObject



88
89
90
# File 'lib/spark/accumulator.rb', line 88

def self.changed
  @@changed
end

.instancesObject



92
93
94
# File 'lib/spark/accumulator.rb', line 92

def self.instances
  @@instances
end

Instance Method Details

#+(term) ⇒ Object



147
148
149
150
# File 'lib/spark/accumulator.rb', line 147

def +(term)
  add(term)
  self
end

#add(term) ⇒ Object

Operations



135
136
137
138
139
140
141
142
143
144
145
# File 'lib/spark/accumulator.rb', line 135

def add(term)
  if !driver? && !@@changed.include?(self)
    @@changed << self
  end

  if @accum_param.is_a?(Proc)
    @value = @accum_param.call(@value, term)
  else
    add_by_symbol(term)
  end
end

#add_by_symbol(term) ⇒ Object



152
153
154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/spark/accumulator.rb', line 152

def add_by_symbol(term)
  case @accum_param
  when :+
    @value += term
  when :-
    @value -= term
  when :*
    @value *= term
  when :/
    @value /= term
  when :**
    @value **= term
  end
end

#driver?Boolean

Driver process or worker

Returns:

  • (Boolean)


127
128
129
# File 'lib/spark/accumulator.rb', line 127

def driver?
  @driver
end

#inspectObject



80
81
82
83
84
85
86
# File 'lib/spark/accumulator.rb', line 80

def inspect
  result  = %{#<#{self.class.name}:0x#{object_id}\n}
  result << %{   ID: #{@id}\n}
  result << %{ Zero: #{@zero_value.to_s[0, 10]}\n}
  result << %{Value: #{@value.to_s[0, 10]}>}
  result
end

#load_accum_paramObject



183
184
185
186
187
188
189
# File 'lib/spark/accumulator.rb', line 183

def load_accum_param
  if @serialized_accum_param.is_a?(String)
    @accum_param = eval(@serialized_accum_param)
  else
    @accum_param = @serialized_accum_param
  end
end

#marshal_dumpObject

Dump and load



171
172
173
# File 'lib/spark/accumulator.rb', line 171

def marshal_dump
  [@id, @zero_value, @serialized_accum_param]
end

#marshal_load(array) ⇒ Object



175
176
177
178
179
180
181
# File 'lib/spark/accumulator.rb', line 175

def marshal_load(array)
  @id, @zero_value, @serialized_accum_param = array

  @value = @zero_value
  @driver = false
  load_accum_param
end

#valid_accum_paramObject



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/spark/accumulator.rb', line 96

def valid_accum_param
  if @accum_param.is_a?(Symbol)
    raise Spark::AccumulatorError, "Unsupported symbol #{@accum_param}" unless SUPPORTED_SYMBOLS.include?(@accum_param)
    @serialized_accum_param = @accum_param
    return
  end

  if @accum_param.is_a?(Proc)
    begin
      @serialized_accum_param = @accum_param.to_source
      return
    rescue
      raise Spark::SerializeError, 'Proc can not be serialized. Use String instead.'
    end
  end

  if @accum_param.is_a?(String)
    @serialized_accum_param = @accum_param
    @accum_param = eval(@accum_param)

    unless @accum_param.is_a?(Proc)
      raise Spark::SerializeError, 'Yours param is not a Proc.'
    end

    return
  end

  raise Spark::AccumulatorError, 'Unsupported param. Use Symbol, Proc or String.'
end