Class: Chef::Expander::VNode
- Inherits:
-
Object
- Object
- Chef::Expander::VNode
- Includes:
- Loggable
- Defined in:
- lib/chef/expander/vnode.rb
Constant Summary
Constants included from Loggable
Instance Attribute Summary collapse
-
#supervise_interval ⇒ Object
readonly
Returns the value of attribute supervise_interval.
-
#vnode_number ⇒ Object
readonly
Returns the value of attribute vnode_number.
Instance Method Summary collapse
- #abort_on_multiple_subscribe ⇒ Object
- #control_queue_name ⇒ Object
-
#initialize(vnode_number, supervisor, opts = {}) ⇒ VNode
constructor
A new instance of VNode.
- #queue ⇒ Object
- #queue_name ⇒ Object
- #start ⇒ Object
- #stop ⇒ Object
- #stopped? ⇒ Boolean
- #supervise_consumer_count ⇒ Object
Methods included from Loggable
Constructor Details
#initialize(vnode_number, supervisor, opts = {}) ⇒ VNode
Returns a new instance of VNode.
37 38 39 40 41 42 43 |
# File 'lib/chef/expander/vnode.rb', line 37 def initialize(vnode_number, supervisor, opts={}) @vnode_number = vnode_number.to_i @supervisor = supervisor @queue = nil @stopped = false @supervise_interval = opts[:supervise_interval] || 30 end |
Instance Attribute Details
#supervise_interval ⇒ Object (readonly)
Returns the value of attribute supervise_interval.
35 36 37 |
# File 'lib/chef/expander/vnode.rb', line 35 def supervise_interval @supervise_interval end |
#vnode_number ⇒ Object (readonly)
Returns the value of attribute vnode_number.
33 34 35 |
# File 'lib/chef/expander/vnode.rb', line 33 def vnode_number @vnode_number end |
Instance Method Details
#abort_on_multiple_subscribe ⇒ Object
69 70 71 72 73 74 75 76 |
# File 'lib/chef/expander/vnode.rb', line 69 def abort_on_multiple_subscribe queue.status do |, subscriber_count| if subscriber_count.to_i > 1 log.error { "Detected extra consumers (#{subscriber_count} total) on queue #{queue_name}, cancelling subscription" } stop end end end |
#control_queue_name ⇒ Object
100 101 102 |
# File 'lib/chef/expander/vnode.rb', line 100 def control_queue_name "#{queue_name}-control" end |
#queue ⇒ Object
89 90 91 92 93 94 |
# File 'lib/chef/expander/vnode.rb', line 89 def queue @queue ||= begin log.debug { "declaring queue #{queue_name}" } MQ.queue(queue_name, :passive => false, :durable => true) end end |
#queue_name ⇒ Object
96 97 98 |
# File 'lib/chef/expander/vnode.rb', line 96 def queue_name "vnode-#{@vnode_number}" end |
#start ⇒ Object
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/chef/expander/vnode.rb', line 45 def start @supervisor.vnode_added(self) subscription_confirmed = Proc.new do abort_on_multiple_subscribe supervise_consumer_count end queue.subscribe(:ack => true, :confirm => subscription_confirmed) do |headers, payload| log.debug {"got #{payload} size(#{payload.size} bytes) on queue #{queue_name}"} solrizer = Solrizer.new(payload) { headers.ack } solrizer.run end rescue MQ::Error => e log.error {"Failed to start subscriber on #{queue_name} #{e.class.name}: #{e.}"} end |
#stop ⇒ Object
78 79 80 81 82 83 |
# File 'lib/chef/expander/vnode.rb', line 78 def stop log.debug {"Cancelling subscription on queue #{queue_name.inspect}"} queue.unsubscribe if queue.subscribed? @supervisor.vnode_removed(self) @stopped = true end |
#stopped? ⇒ Boolean
85 86 87 |
# File 'lib/chef/expander/vnode.rb', line 85 def stopped? @stopped end |
#supervise_consumer_count ⇒ Object
63 64 65 66 67 |
# File 'lib/chef/expander/vnode.rb', line 63 def supervise_consumer_count EM.add_periodic_timer(supervise_interval) do abort_on_multiple_subscribe end end |