Class: Eventboss::Launcher
- Inherits:
-
Object
- Object
- Eventboss::Launcher
- Includes:
- Logging
- Defined in:
- lib/eventboss/launcher.rb
Overview
Launcher manages lifecycle of queues and pollers threads
Constant Summary collapse
- DEFAULT_SHUTDOWN_ATTEMPTS =
5
- DEFAULT_SHUTDOWN_DELAY =
5
Instance Method Summary collapse
- #hard_shutdown ⇒ Object
-
#initialize(queues, client, options = {}) ⇒ Launcher
constructor
A new instance of Launcher.
- #poller_stopped(poller, restart: false) ⇒ Object
- #start ⇒ Object
- #stop ⇒ Object
- #worker_stopped(worker, restart: false) ⇒ Object
Methods included from Logging
Constructor Details
#initialize(queues, client, options = {}) ⇒ Launcher
Returns a new instance of Launcher.
9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
# File 'lib/eventboss/launcher.rb', line 9 def initialize(queues, client, = {}) @options = @queues = queues @client = client @lock = Mutex.new @bus = SizedQueue.new(@queues.size * 10) @pollers = Set.new @queues.each { |q, listener| @pollers << new_poller(q, listener) } @workers = Set.new worker_count.times { |id| @workers << new_worker(id) } end |
Instance Method Details
#hard_shutdown ⇒ Object
43 44 45 46 47 48 49 |
# File 'lib/eventboss/launcher.rb', line 43 def hard_shutdown return if @pollers.empty? && @workers.empty? logger.info('launcher') { "Killing remaining #{@pollers.size} pollers, #{@workers.size} workers" } @pollers.each(&:kill) @workers.each(&:kill) end |
#poller_stopped(poller, restart: false) ⇒ Object
59 60 61 62 63 64 65 |
# File 'lib/eventboss/launcher.rb', line 59 def poller_stopped(poller, restart: false) @lock.synchronize do @pollers.delete(poller) @pollers << new_poller(poller.queue, poller.listener).tap(&:start) if restart end logger.debug('launcher') { "Poller #{poller.id} stopped, restart: #{restart}" } end |
#start ⇒ Object
24 25 26 27 28 29 |
# File 'lib/eventboss/launcher.rb', line 24 def start logger.info('launcher') { "Starting #{@workers.size} workers, #{@pollers.size} pollers" } @pollers.each(&:start) @workers.each(&:start) end |
#stop ⇒ Object
31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/eventboss/launcher.rb', line 31 def stop logger.info('launcher') { 'Gracefully shutdown' } @bus.clear @pollers.each(&:terminate) @workers.each(&:terminate) @bus.close wait_for_shutdown hard_shutdown end |
#worker_stopped(worker, restart: false) ⇒ Object
51 52 53 54 55 56 57 |
# File 'lib/eventboss/launcher.rb', line 51 def worker_stopped(worker, restart: false) @lock.synchronize do @workers.delete(worker) @workers << new_worker(worker.id).tap(&:start) if restart end logger.debug('launcher') { "Worker #{worker.id} stopped, restart: #{restart}" } end |