Class: ScbiMapreduce::WorkManager

Inherits:
EventMachine::Connection
  • Object
show all
Includes:
EM::P::ObjectProtocol
Defined in:
lib/scbi_mapreduce/work_manager.rb

Overview

require ‘json’

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ WorkManager

Returns a new instance of WorkManager.



473
474
475
476
# File 'lib/scbi_mapreduce/work_manager.rb', line 473

def initialize(*args)
  super
  #puts "WORK MANAGER INITIALIZE NEWWWWWWWWWW, ONE per worker"
end

Class Method Details

.checkpointObject



168
169
170
# File 'lib/scbi_mapreduce/work_manager.rb', line 168

def self.checkpoint
  return @@checkpoint
end

.controlled_exitObject



356
357
358
359
# File 'lib/scbi_mapreduce/work_manager.rb', line 356

def self.controlled_exit
  $SERVER_LOG.info("Controlled exit. Workers will be noticed in next round")
  @@want_to_exit=true
end

.end_work_managerObject



92
93
94
# File 'lib/scbi_mapreduce/work_manager.rb', line 92

def self.end_work_manager

end

.get_checkpointObject



198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
# File 'lib/scbi_mapreduce/work_manager.rb', line 198

def self.get_checkpoint
  res = 0
  begin
    if File.exists?(CHECKPOINT_FILE)
      res=File.read(CHECKPOINT_FILE).chomp
      # puts "read checkpoint #{res}"

      res = res.to_i
    end
  rescue
    res = 0
  end

  return res
end

.init_work_managerObject



88
89
90
# File 'lib/scbi_mapreduce/work_manager.rb', line 88

def self.init_work_manager

end

.init_work_manager_internals(checkpointing, keep_order, retry_stuck_jobs, exit_on_many_errors, chunk_size) ⇒ Object



136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/scbi_mapreduce/work_manager.rb', line 136

def self.init_work_manager_internals(checkpointing, keep_order, retry_stuck_jobs,exit_on_many_errors,chunk_size)
  @@count = 0
  @@want_to_exit=false
  @@chunk_count = 0
  @@workers = 0
  @@max_workers = 0
  @@error_count = 0
  @@running_jobs=[]
  # @@compress=true

  @@checkpointing=checkpointing
  @@keep_order=keep_order
  @@retry_stuck_jobs=retry_stuck_jobs
  @@exit_on_many_errors=exit_on_many_errors

  # TODO - Implement a dynamic chunk_size

  @@chunk_size=chunk_size
  $SERVER_LOG.info "Processing in chunks of #{@@chunk_size} objects"
  $SERVER_LOG.info "Checkpointing: #{@@checkpointing}"
  $SERVER_LOG.info "Keeping output order: #{@@keep_order}"
  $SERVER_LOG.info "Retrying stuck jobs: #{@@retry_stuck_jobs}"
  $SERVER_LOG.info "Exiting on too many errors: #{@@exit_on_many_errors}"

  @@checkpoint=0
  if @@checkpointing
    @@checkpoint=self.get_checkpoint
    $SERVER_LOG.info "Detected checkpoint at #{@@checkpoint}"
  end

end

Instance Method Details

#checkpointable_job_received(obj) ⇒ Object



404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
# File 'lib/scbi_mapreduce/work_manager.rb', line 404

def checkpointable_job_received(obj)

  # find reveived object between sent jobs
  received_job=@@running_jobs.find{|o| o.job_identifier==obj.job_identifier}

  # save job if there is was a valid work previously sent
  if received_job

    # change this job's status to received
    received_job.received!(obj.data)



    # # if there are sufficient jobs, count pending ones
    # if (@@running_jobs.count>=PENDING_TO_SAVE)

    # count received objects pending to be written, only until one that is still running is found
    pending_to_save=0
    @@running_jobs.each do |job|
      if job.status==:received
        pending_to_save += 1
      else
        break
      end
    end

    # if there are a few pending to save works, or all remaining works are pending, then save
    if (pending_to_save>=PENDING_TO_SAVE) || (pending_to_save==@@running_jobs.count)
      # save pending jobs and write to disk
      to_remove = 0
      
      if @@checkpointing
        remove_checkpoint
      end
      
      @@running_jobs.each do |job|
        if job.status==:received
          # puts "Sent to save: #{job.inspect}"
          work_received(job.data)
          job.status=:saved
          to_remove += 1
        else
          break
        end
      end

      # if some objects were saved, remove them from the running_jobs
      if to_remove > 0
        to_remove.times do |i|
          o=@@running_jobs.shift

          # puts "Job removed #{o.inspect}"
          o=nil
        end

        # print_running_jobs

        if @@checkpointing && !@@want_to_exit

          save_checkpoint
        end
      end
    end
    # end
  else
    $SERVER_LOG.warn "Job already processed #{obj.inspect}"
  end
end

#error_received(worker_error, obj) ⇒ Object



108
109
110
# File 'lib/scbi_mapreduce/work_manager.rb', line 108

def error_received(worker_error, obj)

end

#goto_checkpointObject

loads a checkpoint



300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
# File 'lib/scbi_mapreduce/work_manager.rb', line 300

def goto_checkpoint
  if @@checkpoint>0
    $SERVER_LOG.info "Skipping until checkpoint #{@@checkpoint}"

    checkpoint=load_user_checkpoint(@@checkpoint)

    # do an automatic checkpoint restore
    if checkpoint==-1
      (@@checkpoint - 1).times do |i|
        $SERVER_LOG.info "Automatic trashing Chunk #{i+1}"
        # get next work
        @@chunk_size.times do
          obj=next_work
        end
        # trash_checkpointed_work
      end

      $SERVER_LOG.info "Automatic checkpoint finished"

      WorkManagerData.job_id=@@checkpoint

      #user has done the checkpoint restoration
    elsif checkpoint>0
      
      WorkManagerData.job_id=checkpoint
      
    elsif checkpoint==0
      $SERVER_LOG.info "Automatic checkpoint not done"
    end


    @@checkpoint=0

  end

end

#load_user_checkpoint(checkpoint) ⇒ Object

if this function returns -1, then automatic checkpointing is done. Return 0 to no checkpointing. Return the restored checkpoint number to start in this point.



123
124
125
# File 'lib/scbi_mapreduce/work_manager.rb', line 123

def load_user_checkpoint(checkpoint)
  return -1
end

#next_workObject



96
97
98
# File 'lib/scbi_mapreduce/work_manager.rb', line 96

def next_work

end

#post_initObject



337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
# File 'lib/scbi_mapreduce/work_manager.rb', line 337

def post_init
  @@workers += 1
  @@max_workers +=1
  # when first worker is connected, do special config
  if @@workers == 1
    @@total_seconds = Time.now
    $SERVER_LOG.info "First worker connected"

    if @@checkpointing
      $SERVER_LOG.info "Checking for checkpoint"
      goto_checkpoint
    end
  end

  $SERVER_LOG.info "#{@@workers} workers connected"
  send_initial_config
  send_next_work
end


226
227
228
229
# File 'lib/scbi_mapreduce/work_manager.rb', line 226

def print_running_jobs
  jobs=@@running_jobs.map{|j| j.inspect}.join("\n")
  $SERVER_LOG.debug("Running Jobs:\n#{jobs}")
end

#read_until_checkpoint(checkpoint) ⇒ Object



116
117
118
# File 'lib/scbi_mapreduce/work_manager.rb', line 116

def read_until_checkpoint(checkpoint)

end

#receive_object(obj) ⇒ Object



362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
# File 'lib/scbi_mapreduce/work_manager.rb', line 362

def receive_object(obj)

  # check if response is an error
  if obj.is_a?(Exception)
    $SERVER_LOG.error("Error in worker #{obj.worker_id} while processing object #{obj.object.inspect}\n" + obj.original_exception.message + ":\n" + obj.original_exception.backtrace.join("\n"))

    @@error_count += 1

    error_received(obj,obj.object.data)

    # if there are too many errors
    if (@@count>100) && (@@error_count >= @@count*0.8)

      # notice programmer
      res=too_many_errors_received

      # force exit if too_many_errors_received returns true
      if @@exit_on_many_errors || res
        $SERVER_LOG.error("Want to exit due to too many errors")
        self.controlled_exit
      end
    end

  else
    # if not using checkpointing


    if @@checkpointing || @@keep_order || @@retry_stuck_jobs
      # print_running_jobs
      checkpointable_job_received(obj)
    else
      work_received(obj.data)
    end
  end

  # free mem
  obj=nil
  send_next_work

end

#remove_checkpointObject



172
173
174
175
176
# File 'lib/scbi_mapreduce/work_manager.rb', line 172

def remove_checkpoint
  if File.exists?(CHECKPOINT_FILE)
    checkpoint_file = FileUtils.mv(CHECKPOINT_FILE,OLD_CHECKPOINT_FILE)
  end
end

#save_checkpointObject



179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# File 'lib/scbi_mapreduce/work_manager.rb', line 179

def save_checkpoint
  checkpoint_file = File.open(CHECKPOINT_FILE,'w')
  
  if !@@running_jobs.empty?
    checkpoint_value = @@running_jobs.first.job_identifier
  else
     checkpoint_value = WorkManagerData.job_id
  end
  
  $SERVER_LOG.info "Saving checkpoint: #{checkpoint_value}"
  
  checkpoint_file.puts checkpoint_value
  
  checkpoint_file.close
  
  save_user_checkpoint

end

#save_user_checkpointObject



127
128
# File 'lib/scbi_mapreduce/work_manager.rb', line 127

def save_user_checkpoint
end

#send_initial_configObject



214
215
216
217
218
219
220
221
222
223
224
# File 'lib/scbi_mapreduce/work_manager.rb', line 214

def send_initial_config
  config = worker_initial_config

  if config.nil?
    obj = :no_initial_config
  else
    obj = {:initial_config => config}
  end

  send_object(obj)
end

#send_next_workObject

send next work to worker



254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
# File 'lib/scbi_mapreduce/work_manager.rb', line 254

def send_next_work

  # if we need to exit, send quit to workers
  
  if @@want_to_exit
    send_object(:quit)
    
  elsif !send_stuck_work
    
  #send stuck work
    objs=[]

    # prepare new data
    @@chunk_size.times do
      obj=next_work
      if obj.nil?
        break
      else
        # add to obj array
        objs << obj
      end
    end

    # if new was data collected, send it
    if objs.count>0
      @@count += objs.count
      @@chunk_count += 1

      work_data=WorkManagerData.new(objs)
      send_object(work_data)

      # to keep order or retry failed job, we need job status
      if @@keep_order || @@retry_stuck_jobs
        # do not remove data to be able to sent it again
        # work_data.data=nil
        @@running_jobs.push work_data
        # print_running_jobs
      end
    else
      # otherwise, send a quit value indicating no more data available
      send_object(:quit)
    end
  end
end

#send_stuck_workObject



231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
# File 'lib/scbi_mapreduce/work_manager.rb', line 231

def send_stuck_work
  sent=false

  if @@retry_stuck_jobs
    # count stuck jobs and re-sent the first one
    stuck_works=@@running_jobs.select{|job| job.stuck?}

    if !stuck_works.empty?
      jobs=stuck_works.map{|j| j.inspect}.join("\n")
      $SERVER_LOG.info("Stuck Jobs:\n#{jobs}")

      # send_object
      send_object(stuck_works.first)
      stuck_works.first.sent!
      $SERVER_LOG.info("Sending stuck work #{stuck_works.first.inspect}")
      sent=true
    end
  end

  return sent
end

#stop_work_managerObject



493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
# File 'lib/scbi_mapreduce/work_manager.rb', line 493

def stop_work_manager
  
  
  
  EM.stop
  $SERVER_LOG.info  "Exiting server"

  self.class.end_work_manager

  @@total_seconds = Time.now-@@total_seconds
  $SERVER_LOG.info  "Total processed: #{@@count} objects in #{@@total_seconds} seconds"
  $SERVER_LOG.info  "Processing rate: #{"%.2f" % (@@count/@@total_seconds.to_f)} objects per second"
  $SERVER_LOG.info  "Connection rate: #{"%.2f" % (@@chunk_count/@@total_seconds.to_f)} connections per second"

  $SERVER_LOG.info  "Number of errors: #{@@error_count}"
  $SERVER_LOG.info  "Chunk size: #{@@chunk_size}"
  $SERVER_LOG.info  "Total connected workers: #{@@max_workers}"
  
end

#too_many_errors_receivedObject



112
113
114
# File 'lib/scbi_mapreduce/work_manager.rb', line 112

def too_many_errors_received

end

#trash_checkpointed_workObject



130
131
132
# File 'lib/scbi_mapreduce/work_manager.rb', line 130

def trash_checkpointed_work

end

#unbindObject

A worker has disconected



479
480
481
482
483
484
485
486
487
488
489
490
491
# File 'lib/scbi_mapreduce/work_manager.rb', line 479

def unbind

  @@workers -= 1
  #puts @@running_jobs.to_json

  $SERVER_LOG.info  "Worker disconnected. #{@@workers} kept running"

  # no more workers left, shutdown EM and stop server
  if @@workers == 0
    $SERVER_LOG.info  "All workers finished"
    stop_work_manager
  end
end

#work_received(obj) ⇒ Object



100
101
102
# File 'lib/scbi_mapreduce/work_manager.rb', line 100

def work_received(obj)

end

#worker_initial_configObject



104
105
106
# File 'lib/scbi_mapreduce/work_manager.rb', line 104

def worker_initial_config

end