Class: MessageBus::TimerThread
- Inherits:
-
Object
- Object
- MessageBus::TimerThread
- Defined in:
- lib/message_bus/timer_thread.rb
Defined Under Namespace
Classes: Cancelable, CancelableEvery
Instance Attribute Summary collapse
-
#jobs ⇒ Object
readonly
Returns the value of attribute jobs.
Instance Method Summary collapse
- #every(delay, &block) ⇒ Object
-
#initialize ⇒ TimerThread
constructor
A new instance of TimerThread.
- #on_error(&block) ⇒ Object
-
#queue(delay = 0, &block) ⇒ Object
queue a block to run after a certain delay (in seconds).
- #stop ⇒ Object
Constructor Details
#initialize ⇒ TimerThread
Returns a new instance of TimerThread.
33 34 35 36 37 38 39 40 |
# File 'lib/message_bus/timer_thread.rb', line 33 def initialize @stopped = false @jobs = [] @mutex = Mutex.new @next = nil @thread = Thread.new { do_work } @on_error = lambda { |e| STDERR.puts "Exception while processing Timer:\n #{e.backtrace.join("\n")}" } end |
Instance Attribute Details
#jobs ⇒ Object (readonly)
Returns the value of attribute jobs.
4 5 6 |
# File 'lib/message_bus/timer_thread.rb', line 4 def jobs @jobs end |
Instance Method Details
#every(delay, &block) ⇒ Object
61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/message_bus/timer_thread.rb', line 61 def every(delay, &block) result = CancelableEvery.new do_work = proc do begin block.call ensure result.current = queue(delay, &do_work) end end result.current = queue(delay, &do_work) result end |
#on_error(&block) ⇒ Object
106 107 108 |
# File 'lib/message_bus/timer_thread.rb', line 106 def on_error(&block) @on_error = block end |
#queue(delay = 0, &block) ⇒ Object
queue a block to run after a certain delay (in seconds)
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/message_bus/timer_thread.rb', line 75 def queue(delay = 0, &block) queue_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) + delay job = [queue_time, block] @mutex.synchronize do i = @jobs.length while i > 0 i -= 1 current, _ = @jobs[i] if current < queue_time i += 1 break end end @jobs.insert(i, job) @next = queue_time if i == 0 end unless @thread.alive? @mutex.synchronize do @thread = Thread.new { do_work } unless @thread.alive? end end if @thread.status == "sleep" @thread.wakeup end Cancelable.new(job) end |
#stop ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/message_bus/timer_thread.rb', line 42 def stop @stopped = true running = true while running @mutex.synchronize do running = @thread && @thread.alive? if running begin @thread.wakeup rescue ThreadError raise if @thread.alive? end end end sleep 0 end end |