Class: ConcurrentPipeline::Processors::ActorProcessor::ActorPool

Inherits:
Object
  • Object
show all
Defined in:
lib/concurrent_pipeline/processors/actor_processor.rb

Defined Under Namespace

Classes: Pool

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(concurrency = 10000) ⇒ ActorPool

Returns a new instance of ActorPool.



102
103
104
# File 'lib/concurrent_pipeline/processors/actor_processor.rb', line 102

def initialize(concurrency = 10000)
  @pool = Pool.spawn(concurrency)
end

Instance Attribute Details

#poolObject (readonly)

Returns the value of attribute pool.



101
102
103
# File 'lib/concurrent_pipeline/processors/actor_processor.rb', line 101

def pool
  @pool
end

Instance Method Details

#process(bodies) ⇒ Object



106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/concurrent_pipeline/processors/actor_processor.rb', line 106

def process(bodies)
  # This can be blocking because it is called by perform
  # which is already being called by an actor, so blocking is ok.
  # However, we still want the pool to be limited in size across
  # all actors.
  bodies.map { |body|
    pool.ask(
      Processors::ActorProcessor::Message.new(
        :queue,
        body
      )
    )
  }.map { _1.terminated.value! }
end