Class: Bunny::ConsumerWorkPool
- Inherits:
-
Object
- Object
- Bunny::ConsumerWorkPool
- Defined in:
- lib/bunny/consumer_work_pool.rb
Overview
Thread pool that dispatches consumer deliveries. Not supposed to be shared between channels or threads.
Every channel its own consumer pool.
Instance Attribute Summary collapse
-
#abort_on_exception ⇒ Object
readonly
Returns the value of attribute abort_on_exception.
-
#size ⇒ Object
readonly
Returns the value of attribute size.
-
#threads ⇒ Object
readonly
API.
Instance Method Summary collapse
- #backlog ⇒ Object
- #busy? ⇒ Boolean
-
#initialize(size = 1, abort_on_exception = false, shutdown_timeout = 60) ⇒ ConsumerWorkPool
constructor
A new instance of ConsumerWorkPool.
- #join(timeout = nil) ⇒ Object
- #kill ⇒ Object
- #pause ⇒ Object
- #resume ⇒ Object
- #running? ⇒ Boolean
- #shutdown(wait_for_workers = false) ⇒ Object
- #start ⇒ Object
- #submit(callable = nil, &block) ⇒ Object
Constructor Details
#initialize(size = 1, abort_on_exception = false, shutdown_timeout = 60) ⇒ ConsumerWorkPool
Returns a new instance of ConsumerWorkPool.
20 21 22 23 24 25 26 27 28 29 |
# File 'lib/bunny/consumer_work_pool.rb', line 20 def initialize(size = 1, abort_on_exception = false, shutdown_timeout = 60) @size = size @abort_on_exception = abort_on_exception @shutdown_timeout = shutdown_timeout @shutdown_mutex = ::Mutex.new @shutdown_conditional = ::ConditionVariable.new @queue = ::Queue.new @paused = false @running = false end |
Instance Attribute Details
#abort_on_exception ⇒ Object (readonly)
Returns the value of attribute abort_on_exception.
18 19 20 |
# File 'lib/bunny/consumer_work_pool.rb', line 18 def abort_on_exception @abort_on_exception end |
#size ⇒ Object (readonly)
Returns the value of attribute size.
17 18 19 |
# File 'lib/bunny/consumer_work_pool.rb', line 17 def size @size end |
#threads ⇒ Object (readonly)
API
16 17 18 |
# File 'lib/bunny/consumer_work_pool.rb', line 16 def threads @threads end |
Instance Method Details
#backlog ⇒ Object
52 53 54 |
# File 'lib/bunny/consumer_work_pool.rb', line 52 def backlog @queue.length end |
#busy? ⇒ Boolean
56 57 58 |
# File 'lib/bunny/consumer_work_pool.rb', line 56 def busy? !@queue.empty? end |
#join(timeout = nil) ⇒ Object
77 78 79 |
# File 'lib/bunny/consumer_work_pool.rb', line 77 def join(timeout = nil) (@threads || []).each { |t| t.join(timeout) } end |
#kill ⇒ Object
93 94 95 96 97 |
# File 'lib/bunny/consumer_work_pool.rb', line 93 def kill @running = false (@threads || []).each { |t| t.kill } end |
#pause ⇒ Object
81 82 83 84 |
# File 'lib/bunny/consumer_work_pool.rb', line 81 def pause @running = false @paused = true end |
#resume ⇒ Object
86 87 88 89 90 91 |
# File 'lib/bunny/consumer_work_pool.rb', line 86 def resume @running = true @paused = false @threads.each { |t| t.run } end |
#running? ⇒ Boolean
48 49 50 |
# File 'lib/bunny/consumer_work_pool.rb', line 48 def running? @running end |
#shutdown(wait_for_workers = false) ⇒ Object
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/bunny/consumer_work_pool.rb', line 60 def shutdown(wait_for_workers = false) was_running = running? @running = false @size.times do submit do |*args| throw :terminate end end return if !(wait_for_workers && @shutdown_timeout && was_running) @shutdown_mutex.synchronize do @shutdown_conditional.wait(@shutdown_mutex, @shutdown_timeout) if busy? end end |
#start ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/bunny/consumer_work_pool.rb', line 36 def start @threads = [] @size.times do t = Thread.new(&method(:run_loop)) t.abort_on_exception = true if abort_on_exception @threads << t end @running = true end |
#submit(callable = nil, &block) ⇒ Object
32 33 34 |
# File 'lib/bunny/consumer_work_pool.rb', line 32 def submit(callable = nil, &block) @queue.push(callable || block) end |