Class: Toro::Manager
- Inherits:
-
Object
- Object
- Toro::Manager
- Includes:
- Actor, ActorManager
- Defined in:
- lib/toro/manager.rb
Instance Attribute Summary collapse
-
#busy ⇒ Object
readonly
Returns the value of attribute busy.
-
#ready ⇒ Object
readonly
Returns the value of attribute ready.
Instance Method Summary collapse
- #assign(job) ⇒ Object
- #clean_up_for_graceful_shutdown ⇒ Object
- #dispatch ⇒ Object
- #hard_shutdown_in(delay) ⇒ Object
- #heartbeat ⇒ Object
-
#initialize(options = {}) ⇒ Manager
constructor
A new instance of Manager.
- #is_ready? ⇒ Boolean
- #processor_complete(processor) ⇒ Object
- #requeue ⇒ Object
- #set_thread(proxy_id, thread) ⇒ Object
- #shutdown ⇒ Object
- #signal_shutdown ⇒ Object
- #start ⇒ Object
- #stop ⇒ Object
- #stopped? ⇒ Boolean
Methods included from ActorManager
Methods included from Actor
Constructor Details
#initialize(options = {}) ⇒ Manager
Returns a new instance of Manager.
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# File 'lib/toro/manager.rb', line 10 def initialize(={}) defaults = { concurrency: 1, queues: [Toro.[:default_queue]], } = defaults.merge() @queues = [:queues] @threads = {} @ready = [:concurrency].times.map do processor = Processor.new_link(current_actor) processor.proxy_id = processor.object_id processor end @busy = [] @is_done = false @fetcher = Fetcher.new({ manager: current_actor, queues: [:queues] }) @listener = Listener.new({ queues: @queues, fetcher: @fetcher, manager: current_actor }) end |
Instance Attribute Details
#busy ⇒ Object (readonly)
Returns the value of attribute busy.
8 9 10 |
# File 'lib/toro/manager.rb', line 8 def busy @busy end |
#ready ⇒ Object (readonly)
Returns the value of attribute ready.
8 9 10 |
# File 'lib/toro/manager.rb', line 8 def ready @ready end |
Instance Method Details
#assign(job) ⇒ Object
52 53 54 55 56 57 |
# File 'lib/toro/manager.rb', line 52 def assign(job) raise 'No processors ready' if !is_ready? processor = @ready.pop @busy << processor processor.async.process(job) end |
#clean_up_for_graceful_shutdown ⇒ Object
69 70 71 72 73 74 75 76 77 |
# File 'lib/toro/manager.rb', line 69 def clean_up_for_graceful_shutdown if @busy.empty? shutdown return true end after(Toro.[:graceful_shutdown_time]) { clean_up_for_graceful_shutdown } false end |
#dispatch ⇒ Object
63 64 65 66 67 |
# File 'lib/toro/manager.rb', line 63 def dispatch raise "No processors, cannot continue!" if @ready.empty? && @busy.empty? raise "No ready processor!?" if @ready.empty? @fetcher.async.fetch end |
#hard_shutdown_in(delay) ⇒ Object
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/toro/manager.rb', line 79 def hard_shutdown_in(delay) Toro.logger.info "Pausing up to #{delay} seconds to allow workers to finish..." after(delay) do # We've reached the timeout and we still have busy processors. # They must die but their messages shall live on. Toro.logger.warn "Terminating #{@busy.size} busy worker threads" requeue @busy.each do |processor| if processor.alive? && thread = @threads.delete(processor.object_id) thread.raise Shutdown end end signal_shutdown end end |
#heartbeat ⇒ Object
119 120 121 122 123 124 125 |
# File 'lib/toro/manager.rb', line 119 def heartbeat return if stopped? after(5) do heartbeat end end |
#is_ready? ⇒ Boolean
59 60 61 |
# File 'lib/toro/manager.rb', line 59 def is_ready? !@ready.empty? end |
#processor_complete(processor) ⇒ Object
127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/toro/manager.rb', line 127 def processor_complete(processor) @threads.delete(processor.object_id) @busy.delete(processor) if stopped? processor.terminate if processor.alive? shutdown if @busy.empty? else @ready << processor if processor.alive? dispatch end end |
#requeue ⇒ Object
104 105 106 107 108 109 |
# File 'lib/toro/manager.rb', line 104 def requeue Toro::Database.with_connection do Job.where(status: 'running', started_by: Toro.process_identity). update_all(status: 'queued', started_by: nil, started_at: nil) end end |
#set_thread(proxy_id, thread) ⇒ Object
115 116 117 |
# File 'lib/toro/manager.rb', line 115 def set_thread(proxy_id, thread) @threads[proxy_id] = thread end |
#shutdown ⇒ Object
99 100 101 102 |
# File 'lib/toro/manager.rb', line 99 def shutdown requeue signal_shutdown end |
#signal_shutdown ⇒ Object
111 112 113 |
# File 'lib/toro/manager.rb', line 111 def signal_shutdown after(0) { signal(:shutdown) } end |
#start ⇒ Object
29 30 31 32 33 34 |
# File 'lib/toro/manager.rb', line 29 def start @is_done = false @listener.async.start @ready.each { dispatch } heartbeat end |
#stop ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/toro/manager.rb', line 36 def stop @is_done = true Toro.logger.debug "Shutting down #{@ready.size} quiet workers" @ready.each { |processor| processor.terminate if processor.alive? } @ready.clear @fetcher.terminate if @fetcher.alive? if @listener.alive? actors[:listener].stop if actors[:listener] @listener.terminate end return if clean_up_for_graceful_shutdown hard_shutdown_in(Toro.[:hard_shutdown_time]) end |
#stopped? ⇒ Boolean
139 140 141 |
# File 'lib/toro/manager.rb', line 139 def stopped? @is_done end |