Class: Bifrost::Manager

Inherits:
Object
  • Object
show all
Includes:
Celluloid, Celluloid::Internals::Logger, Celluloid::Notifications
Defined in:
lib/bifrost/manager.rb

Overview

This class is used to handle the setup and execution of multiple listeners

Instance Method Summary collapse

Constructor Details

#initializeManager

Initialisation of the Manager sets up a subscriber, which informs the manager when the worker is ready to begin work



18
19
20
# File 'lib/bifrost/manager.rb', line 18

def initialize
  subscribe('worker_ready', :worker_ready)
end

Instance Method Details

#add(topic, subscriber, proc, options = {}) ⇒ Object

A supervised worker can be added to the current collection of supervised workers this also starts the actor



24
25
26
27
28
29
30
# File 'lib/bifrost/manager.rb', line 24

def add(topic, subscriber, proc, options = {})
  if topic.nil? || subscriber.nil? || proc.nil?
    raise InvalidWorkerDefinitionError, 'Invalid worker'
  else
    Worker.supervise(as: Worker.handle(topic, subscriber), args: [topic, subscriber, proc, append_default_options(options)])
  end
end

#runObject

When we run all the workers as actors in their own threads. This run also blocks to make sure the spawned threads remain operational indefinitely



34
35
36
37
38
39
40
# File 'lib/bifrost/manager.rb', line 34

def run
  # Put the supervisor thread to sleep indefinitely
  loop do
    # TODO: Perhaps there is a better way?
    sleep(60)
  end
end

#worker_ready(*args) ⇒ Object

This callback is fired when the worker signals it is ready to commence work after initialisation or recommence after recovering from a failure. When a worker completes initialisation it can take a while for the worker to be registered as an Actor in Celluloid, for this reason we need need to put a minor delay in the initialisation procedure



46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/bifrost/manager.rb', line 46

def worker_ready(*args)
  info("Worker bootstrapping with #{args}...")
  sleep(ENV['BIFROST_BOOTSTRAP_DELAY'] || 2) # TODO: Perhaps there is a better way?
  worker = get_worker(args[1], args[2])
  if worker
    # Link the worker to the supervisor so if the worker misbehaves the supervisor is alerted
    # to this poor behaviour, the supervisor decides how to handle recovery
    link(worker)
    worker.async.run
  else
    error("Worker bootstrap failure with #{args}")
  end
end