Class: Delayed::Master::Worker::ThreadPool

Inherits:
Object
  • Object
show all
Defined in:
lib/delayed/master/worker/thread_pool.rb

Instance Method Summary collapse

Constructor Details

#initialize(worker, size) ⇒ ThreadPool

Returns a new instance of ThreadPool.



7
8
9
10
11
12
# File 'lib/delayed/master/worker/thread_pool.rb', line 7

def initialize(worker, size)
  @worker = worker
  @size = size
  @queue = SizedQueue.new(@size)
  @queue_delay = 0.5
end

Instance Method Details

#scheduleObject



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/delayed/master/worker/thread_pool.rb', line 14

def schedule
  @scheduler = Thread.new do
    Delayed::Worker.lifecycle.run_callbacks(:thread, @worker) do
      loop do
        while @queue.num_waiting == 0
          sleep @queue_delay
        end

        if item = yield
          @queue.push(item)
          Thread.pass
        else
          @size.times { @queue.push(:exit) }
          break
        end
      end
    end
  end
end

#shutdownObject



56
57
58
59
60
# File 'lib/delayed/master/worker/thread_pool.rb', line 56

def shutdown
  @scheduler.kill
  @threads.each(&:kill)
  @queue.close
end

#waitObject



51
52
53
54
# File 'lib/delayed/master/worker/thread_pool.rb', line 51

def wait
  @scheduler.join
  @threads.each(&:join)
end

#workObject



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/delayed/master/worker/thread_pool.rb', line 34

def work
  @threads = @size.times.map do
    Thread.new do
      Delayed::Worker.lifecycle.run_callbacks(:thread, @worker) do
        loop do
          item = @queue.pop
          if item == :exit
            break
          else
            yield item
          end
        end
      end
    end
  end
end