Class: Async::WorkerPool
- Inherits:
-
Object
show all
- Defined in:
- lib/async/worker_pool.rb
Defined Under Namespace
Classes: Error, StoppedError
Class Method Summary
collapse
Instance Method Summary
collapse
Constructor Details
#initialize(workers: 1, queue_limit: 1, parent: Async::Task.current, &block) ⇒ WorkerPool
Returns a new instance of WorkerPool.
18
19
20
21
22
23
24
25
26
|
# File 'lib/async/worker_pool.rb', line 18
def initialize(workers: 1, queue_limit: 1, parent: Async::Task.current, &block)
@queue_limit = queue_limit
@parent = parent
@block = block
@semaphore = Async::Semaphore.new(workers, parent: @parent)
@channel = Async::Channel.new(@queue_limit, parent: @semaphore)
@task = start
end
|
Class Method Details
.start ⇒ Object
9
10
11
|
# File 'lib/async/worker_pool.rb', line 9
def start(...)
new(...)
end
|
.with(*args, **params) ⇒ Object
13
14
15
|
# File 'lib/async/worker_pool.rb', line 13
def with(*args, **params, &)
new(*args, **params).with(&)
end
|
Instance Method Details
#busy ⇒ Object
29
|
# File 'lib/async/worker_pool.rb', line 29
def busy = @semaphore.count
|
#call(*args, **params, &block) ⇒ Object
37
38
39
40
41
42
43
44
45
46
|
# File 'lib/async/worker_pool.rb', line 37
def call(*args, **params, &block)
block ||= @block
raise ArgumentError, "Block must be passed to #schedule if it's not passed to #initlaize" if block.nil?
raise StoppedError, "The pool was stopped" unless running?
Async::ResultNotification.new.tap do |notification|
@channel.enqueue([notification, [args, params], block])
end
end
|
#running? ⇒ Boolean
35
|
# File 'lib/async/worker_pool.rb', line 35
def running? = @channel.open?
|
#schedule_all(tasks, &block) ⇒ Object
48
49
50
51
52
53
54
55
56
57
58
59
|
# File 'lib/async/worker_pool.rb', line 48
def schedule_all(tasks, &block)
block ||= @block
raise ArgumentError, "Block must be passed to #schedule_all if it's not passed to #initlaize" if block.nil?
raise StoppedError, "The pool was stopped" unless running?
tasks = tasks.map { |task| [Async::ResultNotification.new, [[task], {}], block] }
@channel.enqueue_all(tasks)
tasks.map(&:first)
end
|
#stop ⇒ Object
30
|
# File 'lib/async/worker_pool.rb', line 30
def stop = @channel.close
|
#stopped? ⇒ Boolean
34
|
# File 'lib/async/worker_pool.rb', line 34
def stopped? = !running?
|
#wait ⇒ Object
32
|
# File 'lib/async/worker_pool.rb', line 32
def wait = @task.wait
|
#waiting ⇒ Object
31
|
# File 'lib/async/worker_pool.rb', line 31
def waiting = @semaphore.waiting.size
|
#with ⇒ Object
61
62
63
64
65
66
|
# File 'lib/async/worker_pool.rb', line 61
def with
yield(self)
ensure
stop
wait
end
|
#workers ⇒ Object
28
|
# File 'lib/async/worker_pool.rb', line 28
def workers = @semaphore.limit
|