Class: Protobuf::Nats::ThreadPool

Inherits:
Object
  • Object
show all
Defined in:
lib/protobuf/nats/thread_pool.rb

Instance Method Summary collapse

Constructor Details

#initialize(size, opts = {}) ⇒ ThreadPool

Returns a new instance of ThreadPool.



5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/protobuf/nats/thread_pool.rb', line 5

def initialize(size, opts = {})
  @queue = ::Queue.new
  @active_work = 0

  # Callbacks
  @error_cb = lambda {|_error|}

  # Synchronization
  @mutex = ::Mutex.new
  @cb_mutex = ::Mutex.new

  # Let's get this party started
  queue_size = opts[:max_queue].to_i || 0
  @max_size = size + queue_size
  @max_workers = size
  @shutting_down = false
  @workers = []
  supervise_workers
end

Instance Method Details

#enqueued_sizeObject



25
26
27
# File 'lib/protobuf/nats/thread_pool.rb', line 25

def enqueued_size
  @queue.size
end

#full?Boolean

Returns:

  • (Boolean)


29
30
31
# File 'lib/protobuf/nats/thread_pool.rb', line 29

def full?
  @active_work >= @max_size
end

#killObject



55
56
57
58
# File 'lib/protobuf/nats/thread_pool.rb', line 55

def kill
  @shutting_down = true
  @workers.map(&:kill)
end

#max_sizeObject



33
34
35
# File 'lib/protobuf/nats/thread_pool.rb', line 33

def max_size
  @max_size
end

#on_error(&cb) ⇒ Object

This callback is executed in a thread safe manner.



73
74
75
# File 'lib/protobuf/nats/thread_pool.rb', line 73

def on_error(&cb)
  @error_cb = cb
end

#push(&work_cb) ⇒ Object

This method is not thread safe by design since our IO model is a single producer thread with multiple consumer threads.



39
40
41
42
43
44
45
46
# File 'lib/protobuf/nats/thread_pool.rb', line 39

def push(&work_cb)
  return false if full?
  return false if @shutting_down
  @queue << [:work, work_cb]
  @mutex.synchronize { @active_work += 1 }
  supervise_workers
  true
end

#shutdownObject

This method is not thread safe by design since our IO model is a single producer thread with multiple consumer threads.



50
51
52
53
# File 'lib/protobuf/nats/thread_pool.rb', line 50

def shutdown
  @shutting_down = true
  @max_workers.times { @queue << [:stop, nil] }
end

#sizeObject



77
78
79
# File 'lib/protobuf/nats/thread_pool.rb', line 77

def size
  @active_work
end

#wait_for_termination(seconds = nil) ⇒ Object

This method is not thread safe by design since our IO model is a single producer thread with multiple consumer threads.



62
63
64
65
66
67
68
69
70
# File 'lib/protobuf/nats/thread_pool.rb', line 62

def wait_for_termination(seconds = nil)
  started_at = ::Time.now
  loop do
    sleep 0.1
    break if seconds && (::Time.now - started_at) >= seconds
    break if @workers.empty?
    prune_dead_workers
  end
end