Class: Protobuf::Nats::ThreadPool

Inherits:
Object
  • Object
show all
Defined in:
lib/protobuf/nats/thread_pool.rb

Instance Method Summary collapse

Constructor Details

#initialize(size, opts = {}) ⇒ ThreadPool

Returns a new instance of ThreadPool.



5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/protobuf/nats/thread_pool.rb', line 5

def initialize(size, opts = {})
  @queue = ::Queue.new
  @active_work = 0

  # Callbacks
  @error_cb = lambda {|_error|}

  # Synchronization
  @mutex = ::Mutex.new
  @cb_mutex = ::Mutex.new

  # Let's get this party started
  queue_size = opts[:max_queue].to_i || 0
  @max_size = size + queue_size
  @max_workers = size
  @shutting_down = false
  @workers = []
  supervise_workers
end

Instance Method Details

#full?Boolean

Returns:

  • (Boolean)


25
26
27
# File 'lib/protobuf/nats/thread_pool.rb', line 25

def full?
  @active_work >= @max_size
end

#killObject



47
48
49
50
# File 'lib/protobuf/nats/thread_pool.rb', line 47

def kill
  @shutting_down = true
  @workers.map(&:kill)
end

#on_error(&cb) ⇒ Object

This callback is executed in a thread safe manner.



65
66
67
# File 'lib/protobuf/nats/thread_pool.rb', line 65

def on_error(&cb)
  @error_cb = cb
end

#push(&work_cb) ⇒ Object

This method is not thread safe by design since our IO model is a single producer thread with multiple consumer threads.



31
32
33
34
35
36
37
38
# File 'lib/protobuf/nats/thread_pool.rb', line 31

def push(&work_cb)
  return false if full?
  return false if @shutting_down
  @queue << [:work, work_cb]
  @mutex.synchronize { @active_work += 1 }
  supervise_workers
  true
end

#shutdownObject

This method is not thread safe by design since our IO model is a single producer thread with multiple consumer threads.



42
43
44
45
# File 'lib/protobuf/nats/thread_pool.rb', line 42

def shutdown
  @shutting_down = true
  @max_workers.times { @queue << [:stop, nil] }
end

#sizeObject



69
70
71
# File 'lib/protobuf/nats/thread_pool.rb', line 69

def size
  @active_work
end

#wait_for_termination(seconds = nil) ⇒ Object

This method is not thread safe by design since our IO model is a single producer thread with multiple consumer threads.



54
55
56
57
58
59
60
61
62
# File 'lib/protobuf/nats/thread_pool.rb', line 54

def wait_for_termination(seconds = nil)
  started_at = ::Time.now
  loop do
    sleep 0.1
    break if seconds && (::Time.now - started_at) >= seconds
    break if @workers.empty?
    prune_dead_workers
  end
end