Class: ScbiMapreduce::WorkManager

Inherits:
EventMachine::Connection
  • Object
show all
Includes:
EM::P::ObjectProtocol
Defined in:
lib/scbi_mapreduce/work_manager.rb

Overview

require ‘json’

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ WorkManager

Returns a new instance of WorkManager.



372
373
374
375
# File 'lib/scbi_mapreduce/work_manager.rb', line 372

def initialize(*args)
  super
  #puts "WORK MANAGER INITIALIZE NEWWWWWWWWWW, ONE per worker"
end

Class Method Details

.checkpointObject



132
133
134
# File 'lib/scbi_mapreduce/work_manager.rb', line 132

def self.checkpoint
  return @@checkpoint
end

.end_work_managerObject



61
62
63
# File 'lib/scbi_mapreduce/work_manager.rb', line 61

def self.end_work_manager

end

.get_checkpointObject



151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/scbi_mapreduce/work_manager.rb', line 151

def self.get_checkpoint
  res = 0
  begin
    if File.exists?('scbi_mapreduce_checkpoint')
      res=File.read('scbi_mapreduce_checkpoint').chomp
      # puts "read checkpoint #{res}"

      res = res.to_i
    end
  rescue
    res = 0
  end

  return res
end

.init_work_managerObject



57
58
59
# File 'lib/scbi_mapreduce/work_manager.rb', line 57

def self.init_work_manager

end

.init_work_manager_internals(checkpointing, keep_order, retry_failed_jobs, exit_on_many_errors, chunk_size) ⇒ Object



105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/scbi_mapreduce/work_manager.rb', line 105

def self.init_work_manager_internals(checkpointing, keep_order, retry_failed_jobs,exit_on_many_errors,chunk_size)
  @@count = 0
  @@chunk_count = 0
  @@workers = 0
  @@max_workers = 0
  @@error_count = 0
  @@running_jobs=[]
  # @@compress=true

  @@checkpointing=checkpointing
  @@keep_order=keep_order
  @@retry_failed_jobs=retry_failed_jobs
  @@exit_on_many_errors=exit_on_many_errors

  # TODO - Implement a dynamic chunk_size

  @@chunk_size=chunk_size
  $SERVER_LOG.info "Processing in chunks of #{@@chunk_size} objects"

  @@checkpoint=0
  if @@checkpointing
    @@checkpoint=self.get_checkpoint
    $SERVER_LOG.info "Detected checkpoint at #{@@checkpoint}"
  end

end

Instance Method Details

#checkpointable_job_received(obj) ⇒ Object



316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
# File 'lib/scbi_mapreduce/work_manager.rb', line 316

def checkpointable_job_received(obj)
  received_job=@@running_jobs.find{|o| o.job_identifier==obj.job_identifier}

  # save job
  if received_job

    # change job's status to received
    received_job.data=obj.data
    received_job.status=:received

    # if there are sufficient jobs, count pending ones
    if (@@running_jobs.count>=PENDING_TO_SAVE)
      # count received objects pending to be written
      pending=0

      @@running_jobs.each do |job|
        if job.status==:received
          pending += 1
        else
          break
        end
      end


      if (pending>PENDING_TO_SAVE) || (pending==@@running_jobs.count)
        # purge contiguos saved data
        to_remove = 0

        @@running_jobs.each_with_index do |job,i|
          if job.status==:received
            # puts "Sent to save: #{job.inspect}"
            work_received(job.data)
            job.status=:saved
            to_remove += 1
          else
            break
          end
        end

        # if some objects were saved
        if to_remove > 0
          to_remove.times do |i|
            o=@@running_jobs.shift
            # puts "Job removed #{o.inspect}"
            o=nil
          end

          save_checkpoint
        end
      end
    end
  else
    $SERVER_LOG.info "Job already processed #{obj.inspect}"
  end
end

#error_received(worker_error, obj) ⇒ Object



77
78
79
# File 'lib/scbi_mapreduce/work_manager.rb', line 77

def error_received(worker_error, obj)

end

#goto_checkpointObject



215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
# File 'lib/scbi_mapreduce/work_manager.rb', line 215

def goto_checkpoint
  if @@checkpoint>0
    $SERVER_LOG.info "Skipping until checkpoint #{@@checkpoint}"

    checkpoint=load_user_checkpoint(@@checkpoint)

    # do an automatic checkpoint restore
    if checkpoint==-1
      @@checkpoint.times do |i|
        # puts "Skipping #{i+1}"

        # get next work
        trash_checkpointed_work
        #            if obj
        #             if obj.methods.include?(:count)
        #               @@count += obj.count
        #             else
        #   @@count += 1
        # end
        #            end
      end

      $SERVER_LOG.info "Automatic checkpoint finished"

      WorkManagerData.job_id=@@checkpoint

      #user has done the checkpoint restoration
    elsif checkpoint>0

      WorkManagerData.job_id=checkpoint
    elsif checkpoint==0
      $SERVER_LOG.info "Automatic checkpoint not done"
    end


    @@checkpoint=0

  end

end

#load_user_checkpoint(checkpoint) ⇒ Object

if this function returns -1, then automatic checkpointing is done. Return 0 to no checkpointing. Return the restored checkpoint number to start in this point.



92
93
94
# File 'lib/scbi_mapreduce/work_manager.rb', line 92

def load_user_checkpoint(checkpoint)
  return -1
end

#next_workObject



65
66
67
# File 'lib/scbi_mapreduce/work_manager.rb', line 65

def next_work

end

#post_initObject



256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
# File 'lib/scbi_mapreduce/work_manager.rb', line 256

def post_init
  @@workers += 1
  @@max_workers +=1
  # when first worker is connected, do special config
  if @@workers == 1
    @@total_seconds = Time.now
    $SERVER_LOG.info "First worker connected"

    if @@checkpointing
      $SERVER_LOG.info "Checking for checkpoint"
      goto_checkpoint
    end
  end

  $SERVER_LOG.info "#{@@workers} workers connected"
  send_initial_config
  send_next_work
end

#read_until_checkpoint(checkpoint) ⇒ Object



85
86
87
# File 'lib/scbi_mapreduce/work_manager.rb', line 85

def read_until_checkpoint(checkpoint)

end

#receive_object(obj) ⇒ Object



276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
# File 'lib/scbi_mapreduce/work_manager.rb', line 276

def receive_object(obj)

  # check if response is an error
  if obj.is_a?(Exception)
    $SERVER_LOG.error("Error in worker #{obj.worker_id} while processing object #{obj.object.inspect}\n" + obj.original_exception.message + ":\n" + obj.original_exception.backtrace.join("\n"))

    @@error_count += 1

    error_received(obj,obj.object.data)

    # if there are too many errors
    if (@@count>100) && (@@error_count >= @@count*0.8)
      @@exit = @@exit_on_many_errors

      # notice programmer
      res=too_many_errors_received

      # force exit if too_many_errors_received returns true
      if res==true
        @@exit=res
      end
    end

  else
    # if not using checkpointing

    if @@checkpointing || @@keep_order || @@retry_failed_jobs
      checkpointable_job_received(obj)
    else
      work_received(obj.data)
    end
  end

  # free mem
  obj=nil
  send_next_work

end

#save_checkpointObject



136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/scbi_mapreduce/work_manager.rb', line 136

def save_checkpoint
  checkpoint_file = File.open('scbi_mapreduce_checkpoint','w')

  if !@@running_jobs.empty?
    checkpoint_file.puts @@running_jobs.first.job_identifier
  else
    checkpoint_file.puts WorkManagerData.job_id-1
  end

  checkpoint_file.close

  save_user_checkpoint

end

#save_user_checkpointObject



96
97
# File 'lib/scbi_mapreduce/work_manager.rb', line 96

def save_user_checkpoint
end

#send_initial_configObject



167
168
169
170
171
172
173
174
175
176
177
# File 'lib/scbi_mapreduce/work_manager.rb', line 167

def send_initial_config
  config = worker_initial_config

  if config.nil?
    obj = :no_initial_config
  else
    obj = {:initial_config => config}
  end

  send_object(obj)
end

#send_next_workObject

send next work to worker



180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
# File 'lib/scbi_mapreduce/work_manager.rb', line 180

def send_next_work

  objs=[]

  @@chunk_size.times do
    obj=next_work
    if obj.nil?
      break
    else
      # add to obj array
      objs << obj
    end
  end


  if objs.count>0
    @@count += objs.count
    @@chunk_count += 1

    work_data=WorkManagerData.new(objs)

    send_object(work_data)

    # to keep order or retry failed job, we need job status
    if @@keep_order || @@retry_failed_jobs
      @@running_jobs.push work_data
    end
  else

    send_object(:quit)
  end


end

#too_many_errors_receivedObject



81
82
83
# File 'lib/scbi_mapreduce/work_manager.rb', line 81

def too_many_errors_received

end

#trash_checkpointed_workObject



99
100
101
# File 'lib/scbi_mapreduce/work_manager.rb', line 99

def trash_checkpointed_work

end

#unbindObject

A worker has disconected



378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
# File 'lib/scbi_mapreduce/work_manager.rb', line 378

def unbind

  @@workers -= 1
  #puts @@running_jobs.to_json

  $SERVER_LOG.info  "Worker disconnected. #{@@workers} kept running"

  # no more workers left, shutdown EM and stop server
  if @@workers == 0
    $SERVER_LOG.info  "All workers finished"
    EM.stop
    $SERVER_LOG.info  "Exiting server"



    self.class.end_work_manager

    @@total_seconds = Time.now-@@total_seconds
    $SERVER_LOG.info  "Total processed: #{@@count} objects in #{@@total_seconds} seconds"
    $SERVER_LOG.info  "Processing rate: #{"%.2f" % (@@count/@@total_seconds.to_f)} objects per second"
    $SERVER_LOG.info  "Connection rate: #{"%.2f" % (@@chunk_count/@@total_seconds.to_f)} connections per second"

    $SERVER_LOG.info  "Number of errors: #{@@error_count}"
    $SERVER_LOG.info  "Chunk size: #{@@chunk_size}"
    $SERVER_LOG.info  "Total connected workers: #{@@max_workers}"
    


  end
end

#work_received(obj) ⇒ Object



69
70
71
# File 'lib/scbi_mapreduce/work_manager.rb', line 69

def work_received(obj)

end

#worker_initial_configObject



73
74
75
# File 'lib/scbi_mapreduce/work_manager.rb', line 73

def worker_initial_config

end