Class: Concur::ThreadPool
- Inherits:
-
Executor::Base
- Object
- Executor::Base
- Concur::ThreadPool
- Defined in:
- lib/thread_pool.rb
Overview
Another example is here: # from: stackoverflow.com/questions/81788/deadlock-in-threadpool
Defined Under Namespace
Classes: UberThread
Instance Attribute Summary collapse
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
Instance Method Summary collapse
- #execute(runnable = nil, channel = nil, &blk) ⇒ Object
-
#initialize(max_size) ⇒ ThreadPool
constructor
A new instance of ThreadPool.
- #process(callable, &blk) ⇒ Object
- #queue_size ⇒ Object
- #shutdown ⇒ Object
- #start_thread ⇒ Object
-
#update(changes) ⇒ Object
listen for config changes.
Constructor Details
#initialize(max_size) ⇒ ThreadPool
Returns a new instance of ThreadPool.
15 16 17 18 19 20 21 22 23 24 |
# File 'lib/thread_pool.rb', line 15 def initialize(max_size) @max_size = max_size # @thread_queue = SizedQueue.new(max_size) @running = true @mutex = Mutex.new @cv = ConditionVariable.new @queue = Queue.new @threads = [] end |
Instance Attribute Details
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
13 14 15 |
# File 'lib/thread_pool.rb', line 13 def queue @queue end |
Instance Method Details
#execute(runnable = nil, channel = nil, &blk) ⇒ Object
44 45 46 47 48 |
# File 'lib/thread_pool.rb', line 44 def execute(runnable=nil, channel=nil, &blk) f = StandardFuture.new(runnable, channel, &blk) process(f) f end |
#process(callable, &blk) ⇒ Object
38 39 40 41 42 |
# File 'lib/thread_pool.rb', line 38 def process(callable, &blk) callable = blk if block_given? @queue.push(callable) start_thread end |
#queue_size ⇒ Object
69 70 71 |
# File 'lib/thread_pool.rb', line 69 def queue_size @queue.size end |
#shutdown ⇒ Object
26 27 28 |
# File 'lib/thread_pool.rb', line 26 def shutdown @running = false end |
#start_thread ⇒ Object
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/thread_pool.rb', line 51 def start_thread @mutex.synchronize do if !@queue.empty? && @threads.size <= @max_size t = UberThread.new do while @running f = @queue.pop f.thread = t f.call end # Concur.logger.info "Thread dying " + t.inspect end Concur.logger.debug "Created new thread " + t.inspect @threads << t end end end |
#update(changes) ⇒ Object
listen for config changes
31 32 33 34 35 36 |
# File 'lib/thread_pool.rb', line 31 def update(changes) if changes[:max_threads] @max_size = changes[:max_threads] puts "Changed max size to #{changes[:max_threads]}" end end |