Class: Async::WorkerPool

Inherits:
Object
  • Object
show all
Defined in:
lib/async/worker_pool.rb

Defined Under Namespace

Classes: Error, StoppedError

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(workers: 1, queue_limit: 1, parent: Async::Task.current, &block) ⇒ WorkerPool

Returns a new instance of WorkerPool.



18
19
20
21
22
23
24
25
26
# File 'lib/async/worker_pool.rb', line 18

def initialize(workers: 1, queue_limit: 1, parent: Async::Task.current, &block)
  @queue_limit = queue_limit
  @parent = parent
  @block = block

  @semaphore = Async::Semaphore.new(workers, parent: @parent)
  @channel = Async::Channel.new(@queue_limit, parent: @semaphore)
  @task = start
end

Class Method Details

.startObject



9
10
11
# File 'lib/async/worker_pool.rb', line 9

def start(...)
  new(...)
end

.with(*args, **params) ⇒ Object



13
14
15
# File 'lib/async/worker_pool.rb', line 13

def with(*args, **params, &)
  new(*args, **params).with(&)
end

Instance Method Details

#busyObject



29
# File 'lib/async/worker_pool.rb', line 29

def busy = @semaphore.count

#call(*args, **params, &block) ⇒ Object

Raises:

  • (ArgumentError)


37
38
39
40
41
42
43
44
45
46
# File 'lib/async/worker_pool.rb', line 37

def call(*args, **params, &block)
  block ||= @block
  raise ArgumentError, "Block must be passed to #schedule if it's not passed to #initlaize" if block.nil?

  raise StoppedError, "The pool was stopped" unless running?

  Async::ResultNotification.new.tap do |notification|
    @channel.enqueue([notification, [args, params], block])
  end
end

#running?Boolean

Returns:

  • (Boolean)


35
# File 'lib/async/worker_pool.rb', line 35

def running? = @channel.open?

#schedule_all(tasks, &block) ⇒ Object

Raises:

  • (ArgumentError)


48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/async/worker_pool.rb', line 48

def schedule_all(tasks, &block)
  block ||= @block

  raise ArgumentError, "Block must be passed to #schedule_all if it's not passed to #initlaize" if block.nil?

  raise StoppedError, "The pool was stopped" unless running?

  tasks = tasks.map { |task| [Async::ResultNotification.new, [[task], {}], block] }

  @channel.enqueue_all(tasks)
  tasks.map(&:first)
end

#stopObject



30
# File 'lib/async/worker_pool.rb', line 30

def stop = @channel.close

#stopped?Boolean

Returns:

  • (Boolean)


34
# File 'lib/async/worker_pool.rb', line 34

def stopped? = !running?

#waitObject



32
# File 'lib/async/worker_pool.rb', line 32

def wait = @task.wait

#waitingObject



31
# File 'lib/async/worker_pool.rb', line 31

def waiting = @semaphore.waiting.size

#withObject



61
62
63
64
65
66
# File 'lib/async/worker_pool.rb', line 61

def with
  yield(self)
ensure
  stop
  wait
end

#workersObject



28
# File 'lib/async/worker_pool.rb', line 28

def workers = @semaphore.limit