Class: Lossfully::ThreadPool
- Inherits:
-
Object
- Object
- Lossfully::ThreadPool
- Defined in:
- lib/lossfully/thread_pool.rb
Overview
There’s (at least) two ways to do this: 1) the Ruby Recipes way, which is to make a thread for every incoming task, but put it to sleep until there’s room in the pool for another task, or 2) have a pool of threads that eat tasks from a queue. This implements (2), mainly because it seemed more fun to me. But also because it doesn’t require the explicit use of Mutexes at all; it uses them, for the sake of sending signals with ConditionVaribles, but if those signals aren’t received there would be a delay of at most 1 second.
Another useful thing about this implementation is for the case when every task you anticipate adding to the ThreadPool of the same general form. Then then ThreadPool can be initialized with a block and you can just add objects to the task queue.
Constant Summary collapse
- DEFAULT_BLOCK =
lambda {|block, &blk| block = blk if block_given? ; block.call}
Instance Attribute Summary collapse
-
#completed ⇒ Object
readonly
Returns the value of attribute completed.
-
#max_size ⇒ Object
Returns the value of attribute max_size.
-
#mutex ⇒ Object
readonly
Returns the value of attribute mutex.
-
#total ⇒ Object
readonly
Returns the value of attribute total.
Instance Method Summary collapse
- #<<(block_or_item) ⇒ Object
- #current ⇒ Object
-
#initialize(max_size = 1, block = nil, &blk) ⇒ ThreadPool
constructor
A new instance of ThreadPool.
- #join ⇒ Object
- #kill ⇒ Object
- #process(block_or_item = nil, &blk) ⇒ Object (also: #enq, #dispatch)
- #queue_size ⇒ Object
- #size ⇒ Object
- #stop ⇒ Object
Constructor Details
#initialize(max_size = 1, block = nil, &blk) ⇒ ThreadPool
Returns a new instance of ThreadPool.
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/lossfully/thread_pool.rb', line 44 def initialize(max_size = 1, block=nil, &blk) @running = true @joining = false @mutex = Mutex.new @cv = ConditionVariable.new @max_size = max_size block = blk if block_given? @block = block.nil? ? DEFAULT_BLOCK : block @queue = Queue.new @workers = [] @master = master_thread @completed = 0 @total = 0 end |
Instance Attribute Details
#completed ⇒ Object (readonly)
Returns the value of attribute completed.
76 77 78 |
# File 'lib/lossfully/thread_pool.rb', line 76 def completed @completed end |
#max_size ⇒ Object
Returns the value of attribute max_size.
76 77 78 |
# File 'lib/lossfully/thread_pool.rb', line 76 def max_size @max_size end |
#mutex ⇒ Object (readonly)
Returns the value of attribute mutex.
76 77 78 |
# File 'lib/lossfully/thread_pool.rb', line 76 def mutex @mutex end |
#total ⇒ Object (readonly)
Returns the value of attribute total.
76 77 78 |
# File 'lib/lossfully/thread_pool.rb', line 76 def total @total end |
Instance Method Details
#<<(block_or_item) ⇒ Object
85 86 87 |
# File 'lib/lossfully/thread_pool.rb', line 85 def << block_or_item process block_or_item end |
#current ⇒ Object
72 73 74 |
# File 'lib/lossfully/thread_pool.rb', line 72 def current @total - @queue.size end |
#join ⇒ Object
89 90 91 92 93 94 95 96 97 |
# File 'lib/lossfully/thread_pool.rb', line 89 def join @running = false @joining = true signal_master # A weird bug happens on this next line if you don't test # @master.alive?, but only sometimes. I don't care enough to # figure it out right now. @master.join if @master.alive? end |
#kill ⇒ Object
112 113 114 115 116 |
# File 'lib/lossfully/thread_pool.rb', line 112 def kill @queue.clear @workers.each(&:kill) join end |
#process(block_or_item = nil, &blk) ⇒ Object Also known as: enq, dispatch
60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/lossfully/thread_pool.rb', line 60 def process (block_or_item=nil, &blk) block_or_item = blk if block_given? if block_or_item.respond_to?(:call) @queue << block_or_item else @queue << lambda { @block.call(block_or_item) } end # @mutex.synchronize { @total +=1 } @total += 1 signal_master end |
#queue_size ⇒ Object
103 104 105 |
# File 'lib/lossfully/thread_pool.rb', line 103 def queue_size @queue.size end |
#size ⇒ Object
99 100 101 |
# File 'lib/lossfully/thread_pool.rb', line 99 def size @workers.size end |
#stop ⇒ Object
107 108 109 110 |
# File 'lib/lossfully/thread_pool.rb', line 107 def stop @queue.clear join end |