Class: ConcurrentPipeline::Processors::ActorProcessor::Scheduler

Inherits:
Object
  • Object
show all
Extended by:
PipeActor
Defined in:
lib/concurrent_pipeline/processors/actor_processor.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from PipeActor

default_block, extended, on_blocks, spawn

Constructor Details

#initialize(dispatch:, store:, pipelineables:, stream:) ⇒ Scheduler

Returns a new instance of Scheduler.



198
199
200
201
202
203
204
205
206
# File 'lib/concurrent_pipeline/processors/actor_processor.rb', line 198

def initialize(dispatch:, store:, pipelineables:, stream:)
  @dispatch = dispatch
  @store = store
  @pipelineables = pipelineables
  @stream = stream
  @status = {}
  @unlimited_pool = ActorPool.new
  @pools = {}
end

Instance Attribute Details

#dispatchObject (readonly)

Returns the value of attribute dispatch.



197
198
199
# File 'lib/concurrent_pipeline/processors/actor_processor.rb', line 197

def dispatch
  @dispatch
end

#pipelineablesObject (readonly)

Returns the value of attribute pipelineables.



197
198
199
# File 'lib/concurrent_pipeline/processors/actor_processor.rb', line 197

def pipelineables
  @pipelineables
end

#statusObject (readonly)

Returns the value of attribute status.



197
198
199
# File 'lib/concurrent_pipeline/processors/actor_processor.rb', line 197

def status
  @status
end

#storeObject (readonly)

Returns the value of attribute store.



197
198
199
# File 'lib/concurrent_pipeline/processors/actor_processor.rb', line 197

def store
  @store
end

#streamObject (readonly)

Returns the value of attribute stream.



197
198
199
# File 'lib/concurrent_pipeline/processors/actor_processor.rb', line 197

def stream
  @stream
end

Instance Method Details

#pool_for(pipelineable) ⇒ Object



208
209
210
211
212
213
214
215
216
# File 'lib/concurrent_pipeline/processors/actor_processor.rb', line 208

def pool_for(pipelineable)
  @pools[pipelineable] ||= (
    if pipelineable.concurrency
      ActorPool.new(pipelineable.concurrency)
    else
      @unlimited_pool
    end
  )
end