Class: Orchestra::ThreadPool

Inherits:
Object
  • Object
show all
Defined in:
lib/orchestra/thread_pool.rb

Defined Under Namespace

Classes: Job

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(args = {}) ⇒ ThreadPool

Returns a new instance of ThreadPool.



15
16
17
18
19
20
21
22
# File 'lib/orchestra/thread_pool.rb', line 15

def initialize args = {}
  @timeout, _ = Util.extract_key_args args, :timeout_ms => 1000
  @threads = Set.new
  @dying = Queue.new
  @pool_lock = Mutex.new
  @queue = Queue.new
  @jobs = {}
end

Instance Attribute Details

#queueObject (readonly)

Returns the value of attribute queue.



13
14
15
# File 'lib/orchestra/thread_pool.rb', line 13

def queue
  @queue
end

Class Method Details

.build(count) ⇒ Object



3
4
5
6
7
# File 'lib/orchestra/thread_pool.rb', line 3

def self.build count
  instance = new
  instance.count = count
  instance
end

.defaultObject



9
10
11
# File 'lib/orchestra/thread_pool.rb', line 9

def self.default
  build 1
end

Instance Method Details

#add_threadObject



52
53
54
# File 'lib/orchestra/thread_pool.rb', line 52

def add_thread
  while_locked do add_thread! end
end

#countObject



36
37
38
# File 'lib/orchestra/thread_pool.rb', line 36

def count
  threads.size
end

#count=(new_count) ⇒ Object



40
41
42
43
44
45
46
47
48
49
50
# File 'lib/orchestra/thread_pool.rb', line 40

def count= new_count
  while_locked do
    loop do
      case @threads.size <=> new_count
      when 0 then return
      when -1 then add_thread!
      when 1 then remove_thread!
      end
    end
  end
end

#enqueue(&work) ⇒ Object



24
25
26
27
28
29
# File 'lib/orchestra/thread_pool.rb', line 24

def enqueue &work
  job = Job.new work
  job.add_observer self
  while_locked do queue << job end
  job
end

#execute(&work) ⇒ Object



31
32
33
34
# File 'lib/orchestra/thread_pool.rb', line 31

def execute &work
  job = enqueue &work
  job.wait
end

#remove_threadObject



56
57
58
# File 'lib/orchestra/thread_pool.rb', line 56

def remove_thread
  while_locked do remove_thread! end
end

#shutdownObject



60
61
62
# File 'lib/orchestra/thread_pool.rb', line 60

def shutdown
  self.count = 0
end

#statusObject



64
65
66
# File 'lib/orchestra/thread_pool.rb', line 64

def status
  while_locked do @threads.map &:status end
end

#threadsObject



68
69
70
# File 'lib/orchestra/thread_pool.rb', line 68

def threads
  while_locked do @threads end
end

#update(event) ⇒ Object



72
73
74
75
# File 'lib/orchestra/thread_pool.rb', line 72

def update event, *;
  return unless event == :failed
  reap_thread
end