Class: Delayed::Master::Worker::ThreadPool
- Inherits:
-
Object
- Object
- Delayed::Master::Worker::ThreadPool
- Defined in:
- lib/delayed/master/worker/thread_pool.rb
Instance Method Summary collapse
-
#initialize(worker, size) ⇒ ThreadPool
constructor
A new instance of ThreadPool.
- #schedule ⇒ Object
- #shutdown ⇒ Object
- #wait ⇒ Object
- #work ⇒ Object
Constructor Details
#initialize(worker, size) ⇒ ThreadPool
Returns a new instance of ThreadPool.
7 8 9 10 11 12 |
# File 'lib/delayed/master/worker/thread_pool.rb', line 7 def initialize(worker, size) @worker = worker @size = size @queue = SizedQueue.new(@size) @queue_delay = 0.5 end |
Instance Method Details
#schedule ⇒ Object
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/delayed/master/worker/thread_pool.rb', line 14 def schedule @scheduler = Thread.new do Delayed::Worker.lifecycle.run_callbacks(:thread, @worker) do loop do while @queue.num_waiting == 0 sleep @queue_delay end if item = yield @queue.push(item) Thread.pass else @size.times { @queue.push(:exit) } break end end end end end |
#shutdown ⇒ Object
56 57 58 59 60 |
# File 'lib/delayed/master/worker/thread_pool.rb', line 56 def shutdown @scheduler.kill @threads.each(&:kill) @queue.close end |
#wait ⇒ Object
51 52 53 54 |
# File 'lib/delayed/master/worker/thread_pool.rb', line 51 def wait @scheduler.join @threads.each(&:join) end |
#work ⇒ Object
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/delayed/master/worker/thread_pool.rb', line 34 def work @threads = @size.times.map do Thread.new do Delayed::Worker.lifecycle.run_callbacks(:thread, @worker) do loop do item = @queue.pop if item == :exit break else yield item end end end end end end |