Class: Scheduler::Manager::Runner

Inherits:
Object
  • Object
show all
Defined in:
lib/scheduler/manager.rb

Instance Method Summary collapse

Constructor Details

#initialize(manager) ⇒ Runner

Returns a new instance of Runner.



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/scheduler/manager.rb', line 14

def initialize(manager)
  @mutex = Mutex.new
  @queue = Queue.new
  @manager = manager
  @reschedule_orphans_thread = Thread.new do
    while true
      sleep 1.minute
      @mutex.synchronize do
        reschedule_orphans
      end
    end
  end
  @keep_alive_thread = Thread.new do
    while true
      @mutex.synchronize do
        keep_alive
      end
      sleep (@manager.keep_alive_duration / 2)
    end
  end
  @thread = Thread.new do
    while true
      process_queue
    end
  end
end

Instance Method Details

#attempts(n) ⇒ Object



104
105
106
107
108
109
110
111
112
# File 'lib/scheduler/manager.rb', line 104

def attempts(n)
  n.times {
    begin
      yield; break
    rescue
      sleep Random.rand
    end
  }
end

#enq(klass) ⇒ Object



89
90
91
# File 'lib/scheduler/manager.rb', line 89

def enq(klass)
  @queue << klass
end

#keep_aliveObject



41
42
43
44
45
# File 'lib/scheduler/manager.rb', line 41

def keep_alive
  @manager.keep_alive
rescue => ex
  Scheduler.handle_job_exception(ex, {message: "Scheduling manager keep-alive"})
end

#process_queueObject



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/scheduler/manager.rb', line 53

def process_queue
  klass = @queue.deq
  # hack alert, I need to both deq and set @running atomically.
  @running = true
  failed = false
  start = Time.now.to_f
  info = @mutex.synchronize { @manager.schedule_info(klass) }
  begin
    info.prev_result = "RUNNING"
    @mutex.synchronize { info.write! }
    klass.new.perform
  rescue => e
    Scheduler.handle_job_exception(e, {message: "Running a scheduled job", job: klass})
    failed = true
  end
  duration = ((Time.now.to_f - start) * 1000).to_i
  info.prev_duration = duration
  info.prev_result = failed ? "FAILED" : "OK"
  info.current_owner = nil
  attempts(3) do
    @mutex.synchronize { info.write! }
  end
rescue => ex
  Scheduler.handle_job_exception(ex, {message: "Processing scheduled job queue"})
ensure
  @running = false
end

#reschedule_orphansObject



47
48
49
50
51
# File 'lib/scheduler/manager.rb', line 47

def reschedule_orphans
  @manager.reschedule_orphans!
rescue => ex
  Scheduler.handle_job_exception(ex, {message: "Scheduling manager orphan rescheduler"})
end

#stop!Object



81
82
83
84
85
86
87
# File 'lib/scheduler/manager.rb', line 81

def stop!
  @mutex.synchronize do
    @thread.kill
    @keep_alive_thread.kill
    @reschedule_orphans_thread.kill
  end
end

#wait_till_doneObject



93
94
95
96
97
98
99
100
101
102
# File 'lib/scheduler/manager.rb', line 93

def wait_till_done
  while !@queue.empty? && !(@queue.num_waiting > 0)
    sleep 0.001
  end
  # this is a hack, but is only used for test anyway
  sleep 0.001
  while @running
    sleep 0.001
  end
end