Class: Protobuf::Nats::ThreadPool
- Inherits:
-
Object
- Object
- Protobuf::Nats::ThreadPool
- Defined in:
- lib/protobuf/nats/thread_pool.rb
Instance Method Summary collapse
- #enqueued_size ⇒ Object
- #full? ⇒ Boolean
-
#initialize(size, opts = {}) ⇒ ThreadPool
constructor
A new instance of ThreadPool.
- #kill ⇒ Object
- #max_size ⇒ Object
-
#on_error(&cb) ⇒ Object
This callback is executed in a thread safe manner.
-
#push(&work_cb) ⇒ Object
This method is not thread safe by design since our IO model is a single producer thread with multiple consumer threads.
-
#shutdown ⇒ Object
This method is not thread safe by design since our IO model is a single producer thread with multiple consumer threads.
- #size ⇒ Object
-
#wait_for_termination(seconds = nil) ⇒ Object
This method is not thread safe by design since our IO model is a single producer thread with multiple consumer threads.
Constructor Details
#initialize(size, opts = {}) ⇒ ThreadPool
Returns a new instance of ThreadPool.
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
# File 'lib/protobuf/nats/thread_pool.rb', line 5 def initialize(size, opts = {}) @queue = ::Queue.new @active_work = 0 # Callbacks @error_cb = lambda {|_error|} # Synchronization @mutex = ::Mutex.new @cb_mutex = ::Mutex.new # Let's get this party started queue_size = opts[:max_queue].to_i || 0 @max_size = size + queue_size @max_workers = size @shutting_down = false @workers = [] supervise_workers end |
Instance Method Details
#enqueued_size ⇒ Object
25 26 27 |
# File 'lib/protobuf/nats/thread_pool.rb', line 25 def enqueued_size @queue.size end |
#full? ⇒ Boolean
29 30 31 |
# File 'lib/protobuf/nats/thread_pool.rb', line 29 def full? @active_work >= @max_size end |
#kill ⇒ Object
55 56 57 58 |
# File 'lib/protobuf/nats/thread_pool.rb', line 55 def kill @shutting_down = true @workers.map(&:kill) end |
#max_size ⇒ Object
33 34 35 |
# File 'lib/protobuf/nats/thread_pool.rb', line 33 def max_size @max_size end |
#on_error(&cb) ⇒ Object
This callback is executed in a thread safe manner.
73 74 75 |
# File 'lib/protobuf/nats/thread_pool.rb', line 73 def on_error(&cb) @error_cb = cb end |
#push(&work_cb) ⇒ Object
This method is not thread safe by design since our IO model is a single producer thread with multiple consumer threads.
39 40 41 42 43 44 45 46 |
# File 'lib/protobuf/nats/thread_pool.rb', line 39 def push(&work_cb) return false if full? return false if @shutting_down @queue << [:work, work_cb] @mutex.synchronize { @active_work += 1 } supervise_workers true end |
#shutdown ⇒ Object
This method is not thread safe by design since our IO model is a single producer thread with multiple consumer threads.
50 51 52 53 |
# File 'lib/protobuf/nats/thread_pool.rb', line 50 def shutdown @shutting_down = true @max_workers.times { @queue << [:stop, nil] } end |
#size ⇒ Object
77 78 79 |
# File 'lib/protobuf/nats/thread_pool.rb', line 77 def size @active_work end |
#wait_for_termination(seconds = nil) ⇒ Object
This method is not thread safe by design since our IO model is a single producer thread with multiple consumer threads.
62 63 64 65 66 67 68 69 70 |
# File 'lib/protobuf/nats/thread_pool.rb', line 62 def wait_for_termination(seconds = nil) started_at = ::Time.now loop do sleep 0.1 break if seconds && (::Time.now - started_at) >= seconds break if @workers.empty? prune_dead_workers end end |