Class: Scheduler::Manager::Runner
- Inherits:
-
Object
- Object
- Scheduler::Manager::Runner
- Defined in:
- lib/scheduler/manager.rb
Instance Method Summary collapse
- #attempts(n) ⇒ Object
- #enq(klass) ⇒ Object
-
#initialize(manager) ⇒ Runner
constructor
A new instance of Runner.
- #keep_alive ⇒ Object
- #process_queue ⇒ Object
- #reschedule_orphans ⇒ Object
- #stop! ⇒ Object
- #wait_till_done ⇒ Object
Constructor Details
#initialize(manager) ⇒ Runner
Returns a new instance of Runner.
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/scheduler/manager.rb', line 14 def initialize(manager) @mutex = Mutex.new @queue = Queue.new @manager = manager @reschedule_orphans_thread = Thread.new do while true sleep 1.minute @mutex.synchronize do reschedule_orphans end end end @keep_alive_thread = Thread.new do while true @mutex.synchronize do keep_alive end sleep (@manager.keep_alive_duration / 2) end end @thread = Thread.new do while true process_queue end end end |
Instance Method Details
#attempts(n) ⇒ Object
104 105 106 107 108 109 110 111 112 |
# File 'lib/scheduler/manager.rb', line 104 def attempts(n) n.times { begin yield; break rescue sleep Random.rand end } end |
#enq(klass) ⇒ Object
89 90 91 |
# File 'lib/scheduler/manager.rb', line 89 def enq(klass) @queue << klass end |
#keep_alive ⇒ Object
41 42 43 44 45 |
# File 'lib/scheduler/manager.rb', line 41 def keep_alive @manager.keep_alive rescue => ex Scheduler.handle_job_exception(ex, {message: "Scheduling manager keep-alive"}) end |
#process_queue ⇒ Object
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/scheduler/manager.rb', line 53 def process_queue klass = @queue.deq # hack alert, I need to both deq and set @running atomically. @running = true failed = false start = Time.now.to_f info = @mutex.synchronize { @manager.schedule_info(klass) } begin info.prev_result = "RUNNING" @mutex.synchronize { info.write! } klass.new.perform rescue => e Scheduler.handle_job_exception(e, {message: "Running a scheduled job", job: klass}) failed = true end duration = ((Time.now.to_f - start) * 1000).to_i info.prev_duration = duration info.prev_result = failed ? "FAILED" : "OK" info.current_owner = nil attempts(3) do @mutex.synchronize { info.write! } end rescue => ex Scheduler.handle_job_exception(ex, {message: "Processing scheduled job queue"}) ensure @running = false end |
#reschedule_orphans ⇒ Object
47 48 49 50 51 |
# File 'lib/scheduler/manager.rb', line 47 def reschedule_orphans @manager.reschedule_orphans! rescue => ex Scheduler.handle_job_exception(ex, {message: "Scheduling manager orphan rescheduler"}) end |
#stop! ⇒ Object
81 82 83 84 85 86 87 |
# File 'lib/scheduler/manager.rb', line 81 def stop! @mutex.synchronize do @thread.kill @keep_alive_thread.kill @reschedule_orphans_thread.kill end end |
#wait_till_done ⇒ Object
93 94 95 96 97 98 99 100 101 102 |
# File 'lib/scheduler/manager.rb', line 93 def wait_till_done while !@queue.empty? && !(@queue.num_waiting > 0) sleep 0.001 end # this is a hack, but is only used for test anyway sleep 0.001 while @running sleep 0.001 end end |