Class: Protobuf::Nats::ThreadPool
- Inherits:
-
Object
- Object
- Protobuf::Nats::ThreadPool
- Defined in:
- lib/protobuf/nats/thread_pool.rb
Instance Method Summary collapse
- #full? ⇒ Boolean
-
#initialize(size, opts = {}) ⇒ ThreadPool
constructor
A new instance of ThreadPool.
- #kill ⇒ Object
-
#on_error(&cb) ⇒ Object
This callback is executed in a thread safe manner.
-
#push(&work_cb) ⇒ Object
This method is not thread safe by design since our IO model is a single producer thread with multiple consumer threads.
-
#shutdown ⇒ Object
This method is not thread safe by design since our IO model is a single producer thread with multiple consumer threads.
- #size ⇒ Object
-
#wait_for_termination(seconds = nil) ⇒ Object
This method is not thread safe by design since our IO model is a single producer thread with multiple consumer threads.
Constructor Details
#initialize(size, opts = {}) ⇒ ThreadPool
Returns a new instance of ThreadPool.
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
# File 'lib/protobuf/nats/thread_pool.rb', line 5 def initialize(size, opts = {}) @queue = ::Queue.new @active_work = 0 # Callbacks @error_cb = lambda {|_error|} # Synchronization @mutex = ::Mutex.new @cb_mutex = ::Mutex.new # Let's get this party started queue_size = opts[:max_queue].to_i || 0 @max_size = size + queue_size @max_workers = size @shutting_down = false @workers = [] supervise_workers end |
Instance Method Details
#full? ⇒ Boolean
25 26 27 |
# File 'lib/protobuf/nats/thread_pool.rb', line 25 def full? @active_work >= @max_size end |
#kill ⇒ Object
47 48 49 50 |
# File 'lib/protobuf/nats/thread_pool.rb', line 47 def kill @shutting_down = true @workers.map(&:kill) end |
#on_error(&cb) ⇒ Object
This callback is executed in a thread safe manner.
65 66 67 |
# File 'lib/protobuf/nats/thread_pool.rb', line 65 def on_error(&cb) @error_cb = cb end |
#push(&work_cb) ⇒ Object
This method is not thread safe by design since our IO model is a single producer thread with multiple consumer threads.
31 32 33 34 35 36 37 38 |
# File 'lib/protobuf/nats/thread_pool.rb', line 31 def push(&work_cb) return false if full? return false if @shutting_down @queue << [:work, work_cb] @mutex.synchronize { @active_work += 1 } supervise_workers true end |
#shutdown ⇒ Object
This method is not thread safe by design since our IO model is a single producer thread with multiple consumer threads.
42 43 44 45 |
# File 'lib/protobuf/nats/thread_pool.rb', line 42 def shutdown @shutting_down = true @max_workers.times { @queue << [:stop, nil] } end |
#size ⇒ Object
69 70 71 |
# File 'lib/protobuf/nats/thread_pool.rb', line 69 def size @active_work end |
#wait_for_termination(seconds = nil) ⇒ Object
This method is not thread safe by design since our IO model is a single producer thread with multiple consumer threads.
54 55 56 57 58 59 60 61 62 |
# File 'lib/protobuf/nats/thread_pool.rb', line 54 def wait_for_termination(seconds = nil) started_at = ::Time.now loop do sleep 0.1 break if seconds && (::Time.now - started_at) >= seconds break if @workers.empty? prune_dead_workers end end |