Class: ConcurrentPipeline::Processors::ActorProcessor

Inherits:
Object
  • Object
show all
Defined in:
lib/concurrent_pipeline/processors/actor_processor.rb

Defined Under Namespace

Modules: PipeActor Classes: ActorPool, Changeset, Dispatch, Message, Scheduler, Ticker, Work

Constant Summary collapse

Ms =
Module.new do
  def self.g(...)
    Message.new(...)
  end

  def msg(...)
    Message.new(...)
  end
end
Msg =

->(*args, **opts) { Message.new(*args, **opts) }

Message.method(:new)

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(store:, pipelineables:, registry:, stream:) ⇒ ActorProcessor

Returns a new instance of ActorProcessor.



37
38
39
40
41
42
# File 'lib/concurrent_pipeline/processors/actor_processor.rb', line 37

def initialize(store:, pipelineables:, registry:, stream:)
  @store = store
  @pipelineables = pipelineables
  @registry = registry
  @stream = stream
end

Instance Attribute Details

#pipelineablesObject (readonly)

Returns the value of attribute pipelineables.



36
37
38
# File 'lib/concurrent_pipeline/processors/actor_processor.rb', line 36

def pipelineables
  @pipelineables
end

#registryObject (readonly)

Returns the value of attribute registry.



36
37
38
# File 'lib/concurrent_pipeline/processors/actor_processor.rb', line 36

def registry
  @registry
end

#storeObject (readonly)

Returns the value of attribute store.



36
37
38
# File 'lib/concurrent_pipeline/processors/actor_processor.rb', line 36

def store
  @store
end

#streamObject (readonly)

Returns the value of attribute stream.



36
37
38
# File 'lib/concurrent_pipeline/processors/actor_processor.rb', line 36

def stream
  @stream
end

Class Method Details

.callObject



32
33
34
# File 'lib/concurrent_pipeline/processors/actor_processor.rb', line 32

def self.call(...)
  new(...).call
end

Instance Method Details

#callObject



335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
# File 'lib/concurrent_pipeline/processors/actor_processor.rb', line 335

def call
  dispatch = Dispatch.spawn
  ticker = Ticker.spawn(dispatch)
  work = Work.spawn(dispatch)
  changeset = Changeset.spawn(dispatch: dispatch, store: store)
  scheduler = Scheduler.spawn(
    dispatch: dispatch,
    store: store,
    pipelineables: pipelineables,
    stream: stream
  )

  dispatch.tell(Msg.(
    :init,
    work: work,
    changeset: changeset,
    scheduler: scheduler
  ))

  Log.debug("triggering initial queue")

  ticker.tell(Msg.(:start))
  scheduler.tell(Msg.(:requeue))

  dispatch.terminated.result
end