Class: Subserver::Manager

Inherits:
Object
  • Object
show all
Includes:
Util
Defined in:
lib/subserver/manager.rb

Overview

The Manager is the central coordination point in Subserver, controlling the lifecycle of the Google Cloud Listeners.

Tasks:

  1. start: Load subscibers and start listeners.

  2. listener_died: restart listener

  3. quiet: tell listeners to stop listening and finish processing messages then shutdown.

  4. stop: hard stop the listeners by deadline.

Note that only the last task requires its own Thread since it has to monitor the shutdown process. The other tasks are performed by other threads.

Constant Summary collapse

PAUSE_TIME =

hack for quicker development / testing environment

STDOUT.tty? ? 0.1 : 0.5

Constants included from Util

Util::EXPIRY

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Util

#fire_event, #hostname, #identity, #logger, #process_nonce, #safe_thread, #watchdog

Methods included from ExceptionHandler

#handle_exception

Constructor Details

#initialize(options = {}) ⇒ Manager

Returns a new instance of Manager.



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/subserver/manager.rb', line 30

def initialize(options={})
  logger.debug { options.inspect }
  @options = options
  
  @done = false
  @listeners = Set.new

  subscribers.each do |subscriber|
    next if subscriber.auto_subscribe? && !subscriber.auto_subscribe
    @listeners << Listener.new(self, subscriber)
  end

  @listeners.select!{ |l| l.valid? }

  @plock = Mutex.new
end

Instance Attribute Details

#listenersObject (readonly)

Returns the value of attribute listeners.



26
27
28
# File 'lib/subserver/manager.rb', line 26

def listeners
  @listeners
end

#optionsObject (readonly)

Returns the value of attribute options.



27
28
29
# File 'lib/subserver/manager.rb', line 27

def options
  @options
end

#subscribersObject (readonly)

Returns the value of attribute subscribers.



28
29
30
# File 'lib/subserver/manager.rb', line 28

def subscribers
  @subscribers
end

Instance Method Details

#listener_died(listener, subscriber, reason) ⇒ Object



98
99
100
101
102
103
104
105
106
107
108
# File 'lib/subserver/manager.rb', line 98

def listener_died(listener, subscriber, reason)
  logger.warn("Listener for #{subscriber.name} Died at #{Time.now}: #{reason}")
  # @plock.synchronize do
  #   @listeners.delete(listener)
  #   unless @done
  #     l = Listener.new(self, subscriber)
  #     @listeners << l
  #     l.start
  #   end
  # end
end

#listener_stopped(listener) ⇒ Object



92
93
94
95
96
# File 'lib/subserver/manager.rb', line 92

def listener_stopped(listener)
  @plock.synchronize do
    @listeners.delete(listener)
  end
end

#quietObject



58
59
60
61
62
63
64
65
# File 'lib/subserver/manager.rb', line 58

def quiet
  return if @done
  @done = true

  logger.info { "Stopping listeners" }
  @listeners.each { |x| x.stop }
  fire_event(:quiet, reverse: true)
end

#startObject



47
48
49
50
51
52
53
54
55
56
# File 'lib/subserver/manager.rb', line 47

def start
  if @listeners.count > 0
    logger.info("Starting Listeners For: #{@listeners.map(&:name).join(', ')}")
    @listeners.each do |x|
      x.start
    end
  else
    logger.warn("No Listeners starting: Couldn't find any subscribers.")
  end
end

#stop(deadline) ⇒ Object



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/subserver/manager.rb', line 70

def stop(deadline)
  quiet
  fire_event(:shutdown, reverse: true)

  # some of the shutdown events can be async,
  # we don't have any way to know when they're done but
  # give them a little time to take effect
  sleep PAUSE_TIME
  return if @listeners.empty?

  logger.info { "Pausing to allow listeners to finish..." }
  remaining = deadline - Time.now
  while remaining > PAUSE_TIME
    return if @listeners.empty?
    sleep PAUSE_TIME
    remaining = deadline - Time.now
  end
  return if @listeners.empty?

  hard_shutdown
end

#stopped?Boolean

Returns:

  • (Boolean)


110
111
112
# File 'lib/subserver/manager.rb', line 110

def stopped?
  @done
end