Class: Chef::Queue
- Inherits:
-
Object
- Object
- Chef::Queue
- Extended by:
- Mixin::ParamsValidate
- Defined in:
- lib/chef/queue.rb
Class Method Summary collapse
- .connect ⇒ Object
- .disconnect ⇒ Object
- .make_url(type, name) ⇒ Object
- .poll_msg ⇒ Object
- .receive_msg ⇒ Object
- .send_msg(type, name, msg) ⇒ Object
- .subscribe(type, name) ⇒ Object
Methods included from Mixin::ParamsValidate
Class Method Details
.connect ⇒ Object
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/chef/queue.rb', line 33 def connect queue_user = Chef::Config[:queue_user] queue_password = Chef::Config[:queue_password] queue_host = Chef::Config[:queue_host] queue_port = Chef::Config[:queue_port] queue_retries = 1 unless queue_retries # Connection.open(login = "", passcode = "", host='localhost', port=61613, reliable=FALSE, reconnectDelay=5) @client = Stomp::Connection.open(queue_user, queue_password, queue_host, queue_port, false) rescue Errno::ECONNREFUSED Chef::Log.error("Connection refused connecting to stomp queue at #{queue_host}:#{queue_port}, retry #{queue_retries}/#{@queue_retry_count}") sleep(@queue_retry_delay) retry if (queue_retries += 1) < @queue_retry_count raise Errno::ECONNREFUSED, "Connection refused connecting to stomp queue at #{queue_host}:#{queue_port}, giving up" rescue Timeout::Error Chef::Log.error("Timeout connecting to stomp queue at #{queue_host}:#{queue_port}, retry #{queue_retries}/#{@queue_retry_count}") sleep(@queue_retry_delay) retry if (queue_retries += 1) < @queue_retry_count raise Timeout::Error, "Timeout connecting to stomp queue at #{queue_host}:#{queue_port}, giving up" else queue_retries = 1 # reset the number of retries on success end |
.disconnect ⇒ Object
139 140 141 142 |
# File 'lib/chef/queue.rb', line 139 def disconnect raise ArgumentError, "You must call connect before you can disconnect!" unless @client @client.disconnect end |
.make_url(type, name) ⇒ Object
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/chef/queue.rb', line 57 def make_url(type, name) validate( { :queue_type => type.to_sym, :queue_name => name.to_sym, }, { :queue_type => { :equal_to => [ :topic, :queue ], }, :queue_name => { :kind_of => [ String, Symbol ], } } ) if Chef::Config[:queue_prefix] queue_prefix = Chef::Config[:queue_prefix] queue_url = "/#{type}/#{queue_prefix}/chef/#{name}" else queue_url = "/#{type}/chef/#{name}" end queue_url end |
.poll_msg ⇒ Object
129 130 131 132 133 134 135 136 137 |
# File 'lib/chef/queue.rb', line 129 def poll_msg connect if @client == nil raw_msg = @client.poll() if raw_msg msg = JSON.parse(raw_msg.body) else nil end end |
.receive_msg ⇒ Object
116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/chef/queue.rb', line 116 def receive_msg connect if @client == nil begin raw_msg = @client.receive() Chef::Log.debug("Received Message from #{raw_msg.headers["destination"]} containing: #{raw_msg.body}") rescue Chef::Log.debug("Received nil message from stomp, retrying") retry end msg = JSON.parse(raw_msg.body) return msg, raw_msg.headers end |
.send_msg(type, name, msg) ⇒ Object
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/chef/queue.rb', line 88 def send_msg(type, name, msg) queue_retries = 1 unless queue_retries validate( { :message => msg, }, { :message => { :respond_to => :to_json } } ) queue_url = make_url(type, name) json = msg.to_json connect if @client == nil Chef::Log.debug("Sending to #{queue_url}: #{json}") begin @client.send(queue_url, json) rescue Errno::EPIPE Chef::Log.debug("Lost connection to stomp queue, reconnecting") connect retry if (queue_retries += 1) < @queue_retry_count raise Errno::EPIPE, "Lost connection to stomp queue, giving up" else queue_retries = 1 # reset the number of retries on success end end |