Class: Workers::Pool
- Inherits:
-
Object
show all
- Includes:
- Helpers
- Defined in:
- lib/workers/pool.rb
Constant Summary
collapse
- DEFAULT_POOL_SIZE =
20
Instance Attribute Summary collapse
Instance Method Summary
collapse
Methods included from Helpers
#concat_e, #log_debug, #log_error, #log_info, #log_warn
Constructor Details
#initialize(options = {}) ⇒ Pool
Returns a new instance of Pool.
9
10
11
12
13
14
15
16
17
18
19
20
21
|
# File 'lib/workers/pool.rb', line 9
def initialize(options = {})
@logger = Workers::LogProxy.new(options[:logger])
@worker_class = options[:worker_class] || Workers::Worker
@input_queue = Queue.new
@lock = Monitor.new
@workers = Set.new
@size = 0
@on_exception = options[:on_exception]
expand(options[:size] || Workers::Pool::DEFAULT_POOL_SIZE)
nil
end
|
Instance Attribute Details
#on_exception ⇒ Object
Returns the value of attribute on_exception.
7
8
9
|
# File 'lib/workers/pool.rb', line 7
def on_exception
@on_exception
end
|
Instance Method Details
#contract(count, &block) ⇒ Object
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
|
# File 'lib/workers/pool.rb', line 83
def contract(count, &block)
@lock.synchronize do
raise Workers::PoolSizeError, 'Count is too large.' if count > @size
count.times do
callback = Proc.new do |worker|
remove_worker(worker)
block.call if block
end
enqueue(:shutdown, callback)
@size -= 1
end
end
nil
end
|
#dispose(max_wait = nil, &block) ⇒ Object
53
54
55
56
57
58
59
|
# File 'lib/workers/pool.rb', line 53
def dispose(max_wait = nil, &block)
shutdown do
block.call if block
end
join(max_wait)
end
|
#enqueue(command, data = nil) ⇒ Object
23
24
25
26
27
|
# File 'lib/workers/pool.rb', line 23
def enqueue(command, data = nil)
@input_queue.push(Event.new(command, data))
nil
end
|
#expand(count) ⇒ Object
71
72
73
74
75
76
77
78
79
80
81
|
# File 'lib/workers/pool.rb', line 71
def expand(count)
@lock.synchronize do
count.times do
worker = @worker_class.new(:input_queue => @input_queue, :on_exception => @on_exception, :logger => @logger)
@workers << worker
@size += 1
end
end
nil
end
|
#inspect ⇒ Object
61
62
63
|
# File 'lib/workers/pool.rb', line 61
def inspect
"#<#{self.class.to_s}:0x#{(object_id << 1).to_s(16)} size=#{size}>"
end
|
#join(max_wait = nil) ⇒ Object
45
46
47
48
49
50
51
|
# File 'lib/workers/pool.rb', line 45
def join(max_wait = nil)
results = @workers.map { |w| w.join(max_wait) }
@workers.clear
@size = 0
results
end
|
29
30
31
32
33
|
# File 'lib/workers/pool.rb', line 29
def perform(&block)
enqueue(:perform, block)
nil
end
|
#resize(new_size) ⇒ Object
101
102
103
104
105
106
107
108
109
110
111
|
# File 'lib/workers/pool.rb', line 101
def resize(new_size)
@lock.synchronize do
if new_size > @size
expand(new_size - @size)
elsif new_size < @size
contract(@size - new_size)
end
end
nil
end
|
#shutdown(&block) ⇒ Object
35
36
37
38
39
40
41
42
43
|
# File 'lib/workers/pool.rb', line 35
def shutdown(&block)
@lock.synchronize do
@size.times do
enqueue(:shutdown, block)
end
end
nil
end
|
#size ⇒ Object
65
66
67
68
69
|
# File 'lib/workers/pool.rb', line 65
def size
@lock.synchronize do
@size
end
end
|