Class: Chef::Expander::VNodeSupervisor
- Inherits:
-
Object
- Object
- Chef::Expander::VNodeSupervisor
- Extended by:
- Loggable
- Includes:
- Loggable
- Defined in:
- lib/chef/expander/vnode_supervisor.rb
Constant Summary collapse
- COULD_NOT_CONNECT =
/Could not connect to server/.freeze
Constants included from Loggable
Instance Attribute Summary collapse
-
#local_node ⇒ Object
readonly
Returns the value of attribute local_node.
-
#vnode_table ⇒ Object
readonly
Returns the value of attribute vnode_table.
Class Method Summary collapse
- .await_parent_death ⇒ Object
- .start ⇒ Object
- .start_cluster_worker ⇒ Object
- .start_consumers ⇒ Object
- .stop_gracefully(signal) ⇒ Object
- .stop_immediately(signal) ⇒ Object
- .trap_signals ⇒ Object
- .wait_for_http_requests_to_complete ⇒ Object
Instance Method Summary collapse
-
#initialize ⇒ VNodeSupervisor
constructor
A new instance of VNodeSupervisor.
- #parse_symbolic(message) ⇒ Object
- #process_control_message(message) ⇒ Object
- #publish_status_to(return_queue) ⇒ Object
- #publish_vnode_table ⇒ Object
- #recover_vnode(vnode_id) ⇒ Object
- #release_vnode ⇒ Object
- #set_log_level(level, rsvp_to) ⇒ Object
- #spawn_vnode(vnode_number) ⇒ Object
- #start(vnode_ids) ⇒ Object
- #start_vnode_table_publisher ⇒ Object
- #stop ⇒ Object
- #vnode_added(vnode) ⇒ Object
- #vnode_removed(vnode) ⇒ Object
- #vnodes ⇒ Object
Methods included from Loggable
Constructor Details
#initialize ⇒ VNodeSupervisor
Returns a new instance of VNodeSupervisor.
146 147 148 149 150 151 |
# File 'lib/chef/expander/vnode_supervisor.rb', line 146 def initialize @vnodes = {} @vnode_table = VNodeTable.new(self) @local_node = Node.local_node @queue_name, @guid = nil, nil end |
Instance Attribute Details
#local_node ⇒ Object (readonly)
Returns the value of attribute local_node.
144 145 146 |
# File 'lib/chef/expander/vnode_supervisor.rb', line 144 def local_node @local_node end |
#vnode_table ⇒ Object (readonly)
Returns the value of attribute vnode_table.
142 143 144 |
# File 'lib/chef/expander/vnode_supervisor.rb', line 142 def vnode_table @vnode_table end |
Class Method Details
.await_parent_death ⇒ Object
64 65 66 67 68 69 70 71 |
# File 'lib/chef/expander/vnode_supervisor.rb', line 64 def self.await_parent_death @awaiting_parent_death = EM.add_periodic_timer(1) do unless Process.ppid == @original_ppid @awaiting_parent_death.cancel stop_immediately("master process death") end end end |
.start ⇒ Object
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/chef/expander/vnode_supervisor.rb', line 73 def self.start @vnode_supervisor = new trap_signals Expander.init_config(ARGV) log.info("Chef Search Expander #{Expander.version} starting up.") begin AMQP.start(Expander.config.amqp_config) do start_consumers end rescue AMQP::Error => e if e. =~ COULD_NOT_CONNECT log.error { "Could not connect to rabbitmq. Make sure it is running and correctly configured." } log.error { e. } AMQP.hard_reset! sleep 5 retry else raise end end end |
.start_cluster_worker ⇒ Object
49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/chef/expander/vnode_supervisor.rb', line 49 def self.start_cluster_worker @vnode_supervisor = new @original_ppid = Process.ppid trap_signals vnodes = Expander.config.vnode_numbers $0 = "chef-expander#{Expander.config.ps_tag} worker ##{Expander.config.index} (vnodes #{vnodes.min}-#{vnodes.max})" AMQP.start(Expander.config.amqp_config) do start_consumers await_parent_death end end |
.start_consumers ⇒ Object
100 101 102 103 104 105 106 107 |
# File 'lib/chef/expander/vnode_supervisor.rb', line 100 def self.start_consumers log.debug { "Setting prefetch count to 1"} MQ.prefetch(1) vnodes = Expander.config.vnode_numbers log.info("Starting Consumers for vnodes #{vnodes.min}-#{vnodes.max}") @vnode_supervisor.start(vnodes) end |
.stop_gracefully(signal) ⇒ Object
123 124 125 126 127 |
# File 'lib/chef/expander/vnode_supervisor.rb', line 123 def self.stop_gracefully(signal) log.info { "Initiating graceful shutdown on signal (#{signal})" } @vnode_supervisor.stop wait_for_http_requests_to_complete end |
.stop_immediately(signal) ⇒ Object
114 115 116 117 118 119 120 121 |
# File 'lib/chef/expander/vnode_supervisor.rb', line 114 def self.stop_immediately(signal) log.info { "Initiating immediate shutdown on signal (#{signal})" } @vnode_supervisor.stop EM.add_timer(1) do AMQP.stop EM.stop end end |
.trap_signals ⇒ Object
109 110 111 112 |
# File 'lib/chef/expander/vnode_supervisor.rb', line 109 def self.trap_signals Kernel.trap(:INT) { stop_immediately(:INT) } Kernel.trap(:TERM) { stop_gracefully(:TERM) } end |
.wait_for_http_requests_to_complete ⇒ Object
129 130 131 132 133 134 135 136 137 138 139 140 |
# File 'lib/chef/expander/vnode_supervisor.rb', line 129 def self.wait_for_http_requests_to_complete if Expander::Solrizer.http_requests_active? log.info { "waiting for in progress HTTP Requests to complete"} EM.add_timer(1) do wait_for_http_requests_to_complete end else log.info { "HTTP requests completed, shutting down"} AMQP.stop EM.stop end end |
Instance Method Details
#parse_symbolic(message) ⇒ Object
259 260 261 |
# File 'lib/chef/expander/vnode_supervisor.rb', line 259 def parse_symbolic() Yajl::Parser.new(:symbolize_keys => true).parse() end |
#process_control_message(message) ⇒ Object
199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 |
# File 'lib/chef/expander/vnode_supervisor.rb', line 199 def () = parse_symbolic() case [:action] when "claim_vnode" spawn_vnode([:vnode_id]) when "recover_vnode" recover_vnode([:vnode_id]) when "release_vnodes" raise "todo" release_vnode() when "update_vnode_table" @vnode_table.update_table([:data]) when "vnode_table_publish" publish_vnode_table when "status" publish_status_to([:rsvp]) when "set_log_level" set_log_level([:level], [:rsvp]) else log.error { "invalid control message #{.inspect}" } end rescue Exception => e log.error { "Error processing a control message."} log.error { "#{e.class.name}: #{e.}\n#{e.backtrace.join("\n")}" } end |
#publish_status_to(return_queue) ⇒ Object
237 238 239 240 241 |
# File 'lib/chef/expander/vnode_supervisor.rb', line 237 def publish_status_to(return_queue) status_update = @local_node.to_hash status_update[:vnodes] = vnodes MQ.queue(return_queue).publish(Yajl::Encoder.encode(status_update)) end |
#publish_vnode_table ⇒ Object
230 231 232 233 234 235 |
# File 'lib/chef/expander/vnode_supervisor.rb', line 230 def publish_vnode_table status_update = @local_node.to_hash status_update[:vnodes] = vnodes status_update[:update] = :add @local_node.(Yajl::Encoder.encode({:action => :update_vnode_table, :data => status_update})) end |
#recover_vnode(vnode_id) ⇒ Object
250 251 252 253 254 255 256 257 |
# File 'lib/chef/expander/vnode_supervisor.rb', line 250 def recover_vnode(vnode_id) if @vnode_table.local_node_is_leader? log.debug { "Recovering vnode: #{vnode_id}" } @local_node.(Yajl::Encoder.encode({:action => :claim_vnode, :vnode_id => vnode_id})) else log.debug { "Ignoring :recover_vnode message because this node is not the leader" } end end |
#release_vnode ⇒ Object
195 196 197 |
# File 'lib/chef/expander/vnode_supervisor.rb', line 195 def release_vnode # TODO end |
#set_log_level(level, rsvp_to) ⇒ Object
243 244 245 246 247 248 |
# File 'lib/chef/expander/vnode_supervisor.rb', line 243 def set_log_level(level, rsvp_to) log.info { "setting log level to #{level} due to command from #{rsvp_to}" } new_log_level = (Expander.config.log_level = level.to_sym) reply = {:level => new_log_level, :node => @local_node.to_hash} MQ.queue(rsvp_to).publish(Yajl::Encoder.encode(reply)) end |
#spawn_vnode(vnode_number) ⇒ Object
191 192 193 |
# File 'lib/chef/expander/vnode_supervisor.rb', line 191 def spawn_vnode(vnode_number) VNode.new(vnode_number, self).start end |
#start(vnode_ids) ⇒ Object
153 154 155 156 157 158 159 160 161 |
# File 'lib/chef/expander/vnode_supervisor.rb', line 153 def start(vnode_ids) @local_node.start do || () end #start_vnode_table_publisher Array(vnode_ids).each { |vnode_id| spawn_vnode(vnode_id) } end |
#start_vnode_table_publisher ⇒ Object
226 227 228 |
# File 'lib/chef/expander/vnode_supervisor.rb', line 226 def start_vnode_table_publisher @vnode_table_publisher = EM.add_periodic_timer(10) { publish_vnode_table } end |
#stop ⇒ Object
163 164 165 166 167 168 169 170 171 172 173 174 175 |
# File 'lib/chef/expander/vnode_supervisor.rb', line 163 def stop @local_node.stop #log.debug { "stopping vnode table updater" } #@vnode_table_publisher.cancel log.info { "Stopping VNode queue subscribers"} @vnodes.each do |vnode_number, vnode| log.debug { "Stopping consumer on VNode #{vnode_number}"} vnode.stop end end |
#vnode_added(vnode) ⇒ Object
177 178 179 180 |
# File 'lib/chef/expander/vnode_supervisor.rb', line 177 def vnode_added(vnode) log.debug { "vnode #{vnode.vnode_number} registered with supervisor" } @vnodes[vnode.vnode_number.to_i] = vnode end |
#vnode_removed(vnode) ⇒ Object
182 183 184 185 |
# File 'lib/chef/expander/vnode_supervisor.rb', line 182 def vnode_removed(vnode) log.debug { "vnode #{vnode.vnode_number} unregistered from supervisor" } @vnodes.delete(vnode.vnode_number.to_i) end |
#vnodes ⇒ Object
187 188 189 |
# File 'lib/chef/expander/vnode_supervisor.rb', line 187 def vnodes @vnodes.keys.sort end |