Class: MicroQ::Manager::Default
- Inherits:
-
Object
- Object
- MicroQ::Manager::Default
- Includes:
- Celluloid
- Defined in:
- lib/micro_q/manager/default.rb
Overview
The default manager implementation. Wrapper for a Queue and a pool of Workers. At each time slice after start! was called, try to dequeue messages from the queue. Perform each message on the worker pool.
The pool of workers (more info):
https://github.com/celluloid/celluloid/wiki/Pools
The pool manages asynchronously assigning messages to available workers, handles exceptions by restarting the dead actors and is generally a beautiful abstraction on top of a group of linked actors/threads.
Instance Attribute Summary collapse
-
#busy ⇒ Object
readonly
Returns the value of attribute busy.
-
#current ⇒ Object
readonly
Returns the value of attribute current.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
-
#workers ⇒ Object
readonly
Returns the value of attribute workers.
Class Method Summary collapse
Instance Method Summary collapse
-
#build_missing_workers ⇒ Object
Don’t shrink the pool if the config changes.
- #kill_all ⇒ Object
- #missing_worker_count ⇒ Object
-
#reinitialize(*args) ⇒ Object
(also: #initialize)
Handle init/death of the Queue or the Worker pool When a worker dies the args are (#<Actor …>, #<Exception>).
- #start ⇒ Object
- #work_done(worker) ⇒ Object
- #work_on(message) ⇒ Object
Instance Attribute Details
#busy ⇒ Object (readonly)
Returns the value of attribute busy.
23 24 25 |
# File 'lib/micro_q/manager/default.rb', line 23 def busy @busy end |
#current ⇒ Object (readonly)
Returns the value of attribute current.
23 24 25 |
# File 'lib/micro_q/manager/default.rb', line 23 def current @current end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
23 24 25 |
# File 'lib/micro_q/manager/default.rb', line 23 def queue @queue end |
#workers ⇒ Object (readonly)
Returns the value of attribute workers.
23 24 25 |
# File 'lib/micro_q/manager/default.rb', line 23 def workers @workers end |
Class Method Details
.shutdown! ⇒ Object
105 106 107 |
# File 'lib/micro_q/manager/default.rb', line 105 def self.shutdown! @shutdown = true end |
.shutdown? ⇒ Boolean
101 102 103 |
# File 'lib/micro_q/manager/default.rb', line 101 def self.shutdown? !!@shutdown end |
Instance Method Details
#build_missing_workers ⇒ Object
Don’t shrink the pool if the config changes
82 83 84 85 86 87 88 89 90 91 |
# File 'lib/micro_q/manager/default.rb', line 82 def build_missing_workers return if MicroQ.queue_only? workers.select!(&:alive?) busy.select!(&:alive?) missing_worker_count.times do workers << MicroQ.config.worker.new_link(current_actor) end end |
#kill_all ⇒ Object
97 98 99 |
# File 'lib/micro_q/manager/default.rb', line 97 def kill_all (workers + busy).each {|w| w.terminate if w.alive? } end |
#missing_worker_count ⇒ Object
93 94 95 |
# File 'lib/micro_q/manager/default.rb', line 93 def missing_worker_count [MicroQ.config.workers - (workers.size + busy.size), 0].max end |
#reinitialize(*args) ⇒ Object Also known as: initialize
Handle init/death of the Queue or the Worker pool When a worker dies the args are (#<Actor …>, #<Exception>)
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/micro_q/manager/default.rb', line 60 def reinitialize(*args) kill_all and return if self.class.shutdown? unless @queue && queue.alive? @queue = MicroQ.config.queue.new_link end @busy ||= [] @workers ||= [] @current ||= {} if args.any? = current.delete(args.first) queue.finished!() if queue.respond_to?(:finished) end build_missing_workers end |
#start ⇒ Object
25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/micro_q/manager/default.rb', line 25 def start return if MicroQ.queue_only? count = workers.size if ( = queue.dequeue(count)).any? .each do || work_on() end end after(2) { start } end |
#work_done(worker) ⇒ Object
39 40 41 42 43 44 45 |
# File 'lib/micro_q/manager/default.rb', line 39 def work_done(worker) = current.delete(worker) queue.finished!() if queue.respond_to?(:finished) busy.delete(worker) workers.push(worker) end |
#work_on(message) ⇒ Object
47 48 49 50 51 52 53 54 |
# File 'lib/micro_q/manager/default.rb', line 47 def work_on() worker = workers.pop busy << worker current[worker] = worker.perform!() end |