Class: Zapp::WorkerPool

Inherits:
Object
  • Object
show all
Defined in:
lib/zapp/worker_pool.rb

Overview

Manages and dispatches work to a pool of Zap::Worker’s

Constant Summary collapse

SIGNALS =
{
  EXIT: :exit
}.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(context_pipe:, socket_pipe:) ⇒ WorkerPool

Returns a new instance of WorkerPool.



12
13
14
15
16
17
18
19
20
21
22
# File 'lib/zapp/worker_pool.rb', line 12

def initialize(context_pipe:, socket_pipe:)
  @context_pipe = context_pipe
  @workers = []
  Zapp.config.parallelism.times do |i|
    @workers << Worker.new(
      context_pipe: context_pipe,
      socket_pipe: socket_pipe,
      index: i
    )
  end
end

Instance Attribute Details

#context_pipeObject (readonly)

Returns the value of attribute context_pipe.



6
7
8
# File 'lib/zapp/worker_pool.rb', line 6

def context_pipe
  @context_pipe
end

#parallelismObject (readonly)

Returns the value of attribute parallelism.



6
7
8
# File 'lib/zapp/worker_pool.rb', line 6

def parallelism
  @parallelism
end

#workersObject (readonly)

Returns the value of attribute workers.



6
7
8
# File 'lib/zapp/worker_pool.rb', line 6

def workers
  @workers
end

Instance Method Details

#drainObject

Finishes processing of all requests and shuts down workers



30
31
32
33
34
35
36
37
# File 'lib/zapp/worker_pool.rb', line 30

def drain
  Zapp.config.parallelism.times { process(context: SIGNALS[:EXIT]) }
  workers.map do |w|
    w.terminate
  rescue Ractor::ClosedError
    # Ractor has already exited
  end
end

#process(context:) ⇒ Object

Sends a socket to one of our workers



25
26
27
# File 'lib/zapp/worker_pool.rb', line 25

def process(context:)
  context_pipe.send(context)
end