Class: ConcurrentPipeline::Processors::ActorProcessor
- Inherits:
-
Object
- Object
- ConcurrentPipeline::Processors::ActorProcessor
- Defined in:
- lib/concurrent_pipeline/processors/actor_processor.rb
Defined Under Namespace
Modules: PipeActor Classes: ActorPool, Changeset, Dispatch, Message, Scheduler, Ticker, Work
Constant Summary collapse
- Ms =
Module.new do def self.g(...) Message.new(...) end def msg(...) Message.new(...) end end
- Msg =
->(*args, **opts) { Message.new(*args, **opts) }
Message.method(:new)
Instance Attribute Summary collapse
-
#pipelineables ⇒ Object
readonly
Returns the value of attribute pipelineables.
-
#registry ⇒ Object
readonly
Returns the value of attribute registry.
-
#store ⇒ Object
readonly
Returns the value of attribute store.
-
#stream ⇒ Object
readonly
Returns the value of attribute stream.
Class Method Summary collapse
Instance Method Summary collapse
- #call ⇒ Object
-
#initialize(store:, pipelineables:, registry:, stream:) ⇒ ActorProcessor
constructor
A new instance of ActorProcessor.
Constructor Details
#initialize(store:, pipelineables:, registry:, stream:) ⇒ ActorProcessor
Returns a new instance of ActorProcessor.
37 38 39 40 41 42 |
# File 'lib/concurrent_pipeline/processors/actor_processor.rb', line 37 def initialize(store:, pipelineables:, registry:, stream:) @store = store @pipelineables = pipelineables @registry = registry @stream = stream end |
Instance Attribute Details
#pipelineables ⇒ Object (readonly)
Returns the value of attribute pipelineables.
36 37 38 |
# File 'lib/concurrent_pipeline/processors/actor_processor.rb', line 36 def pipelineables @pipelineables end |
#registry ⇒ Object (readonly)
Returns the value of attribute registry.
36 37 38 |
# File 'lib/concurrent_pipeline/processors/actor_processor.rb', line 36 def registry @registry end |
#store ⇒ Object (readonly)
Returns the value of attribute store.
36 37 38 |
# File 'lib/concurrent_pipeline/processors/actor_processor.rb', line 36 def store @store end |
#stream ⇒ Object (readonly)
Returns the value of attribute stream.
36 37 38 |
# File 'lib/concurrent_pipeline/processors/actor_processor.rb', line 36 def stream @stream end |
Class Method Details
.call ⇒ Object
32 33 34 |
# File 'lib/concurrent_pipeline/processors/actor_processor.rb', line 32 def self.call(...) new(...).call end |
Instance Method Details
#call ⇒ Object
335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 |
# File 'lib/concurrent_pipeline/processors/actor_processor.rb', line 335 def call dispatch = Dispatch.spawn ticker = Ticker.spawn(dispatch) work = Work.spawn(dispatch) changeset = Changeset.spawn(dispatch: dispatch, store: store) scheduler = Scheduler.spawn( dispatch: dispatch, store: store, pipelineables: pipelineables, stream: stream ) dispatch.tell(Msg.( :init, work: work, changeset: changeset, scheduler: scheduler )) Log.debug("triggering initial queue") ticker.tell(Msg.(:start)) scheduler.tell(Msg.(:requeue)) dispatch.terminated.result end |