Class: MCollective::Client
- Inherits:
-
Object
- Object
- MCollective::Client
- Defined in:
- lib/mcollective/client.rb
Overview
Helpers for writing clients that can talk to agents, do discovery and so forth
Instance Attribute Summary collapse
-
#options ⇒ Object
Returns the value of attribute options.
-
#stats ⇒ Object
Returns the value of attribute stats.
Instance Method Summary collapse
-
#collective ⇒ Object
Returns the configured main collective if no specific collective is specified as options.
-
#disconnect ⇒ Object
Disconnects cleanly from the middleware.
-
#discover(filter, timeout, limit = 0) ⇒ Object
Performs a discovery of nodes matching the filter passed returns an array of nodes.
-
#discovered_req(body, agent, options = false) ⇒ Object
Performs a discovery and then send a request, performs the passed block for each response.
-
#display_stats(stats, options = false, caption = "stomp call summary") ⇒ Object
Prints out the stats returns from req and discovered_req in a nice way.
-
#initialize(configfile) ⇒ Client
constructor
A new instance of Client.
-
#receive(requestid = nil) ⇒ Object
Blocking call that waits for ever for a message to arrive.
-
#req(body, agent = nil, options = false, waitfor = 0) ⇒ Object
Send a request, performs the passed block for each response.
-
#sendreq(msg, agent, filter = {}) ⇒ Object
Sends a request and returns the generated request id, doesn’t wait for responses and doesn’t execute any passed in code blocks for responses.
- #subscribe(agent, type) ⇒ Object
- #unsubscribe(agent, type) ⇒ Object
Constructor Details
#initialize(configfile) ⇒ Client
Returns a new instance of Client.
6 7 8 9 10 11 12 13 14 15 16 17 18 |
# File 'lib/mcollective/client.rb', line 6 def initialize(configfile) @config = Config.instance @config.loadconfig(configfile) unless @config.configured @connection = PluginManager["connector_plugin"] @security = PluginManager["security_plugin"] @security.initiated_by = :client @options = nil @subscriptions = {} @connection.connect end |
Instance Attribute Details
#options ⇒ Object
Returns the value of attribute options.
4 5 6 |
# File 'lib/mcollective/client.rb', line 4 def @options end |
#stats ⇒ Object
Returns the value of attribute stats.
4 5 6 |
# File 'lib/mcollective/client.rb', line 4 def stats @stats end |
Instance Method Details
#collective ⇒ Object
Returns the configured main collective if no specific collective is specified as options
22 23 24 25 26 27 28 |
# File 'lib/mcollective/client.rb', line 22 def collective if @options[:collective].nil? @config.main_collective else @options[:collective] end end |
#disconnect ⇒ Object
Disconnects cleanly from the middleware
31 32 33 34 |
# File 'lib/mcollective/client.rb', line 31 def disconnect Log.debug("Disconnecting from the middleware") @connection.disconnect end |
#discover(filter, timeout, limit = 0) ⇒ Object
Performs a discovery of nodes matching the filter passed returns an array of nodes
An integer limit can be supplied this will have the effect of the discovery being cancelled soon as it reached the requested limit of hosts
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/mcollective/client.rb', line 114 def discover(filter, timeout, limit=0) raise "Limit has to be an integer" unless limit.is_a?(Fixnum) begin hosts = [] Timeout.timeout(timeout) do reqid = sendreq("ping", "discovery", filter) Log.debug("Waiting #{timeout} seconds for discovery replies to request #{reqid}") loop do reply = receive(reqid) Log.debug("Got discovery reply from #{reply.payload[:senderid]}") hosts << reply.payload[:senderid] return hosts if limit > 0 && hosts.size == limit end end rescue Timeout::Error => e rescue Exception => e raise ensure unsubscribe("discovery", :reply) end hosts.sort end |
#discovered_req(body, agent, options = false) ⇒ Object
Performs a discovery and then send a request, performs the passed block for each response
times = discovered_req("status", "mcollectived", , client) {|resp|
pp resp
}
It returns a hash of times and timeouts for discovery and total run is taken from the options hash which in turn is generally built using MCollective::Optionparser
201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 |
# File 'lib/mcollective/client.rb', line 201 def discovered_req(body, agent, =false) stat = {:starttime => Time.now.to_f, :discoverytime => 0, :blocktime => 0, :totaltime => 0} = @options unless STDOUT.sync = true print("Determining the amount of hosts matching filter for #{[:disctimeout]} seconds .... ") begin discovered_hosts = discover([:filter], [:disctimeout]) discovered = discovered_hosts.size hosts_responded = [] hosts_not_responded = discovered_hosts stat[:discoverytime] = Time.now.to_f - stat[:starttime] puts("#{discovered}\n\n") rescue Interrupt puts("Discovery interrupted.") exit! end raise("No matching clients found") if discovered == 0 begin Timeout.timeout([:timeout]) do reqid = sendreq(body, agent, [:filter]) (1..discovered).each do |c| resp = receive(reqid) hosts_responded << resp.payload[:senderid] hosts_not_responded.delete(resp.payload[:senderid]) if hosts_not_responded.include?(resp.payload[:senderid]) yield(resp.payload) end end rescue Interrupt => e rescue Timeout::Error => e end stat[:totaltime] = Time.now.to_f - stat[:starttime] stat[:blocktime] = stat[:totaltime] - stat[:discoverytime] stat[:responses] = hosts_responded.size stat[:responsesfrom] = hosts_responded stat[:noresponsefrom] = hosts_not_responded stat[:discovered] = discovered @stats = stat return stat end |
#display_stats(stats, options = false, caption = "stomp call summary") ⇒ Object
Prints out the stats returns from req and discovered_req in a nice way
255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 |
# File 'lib/mcollective/client.rb', line 255 def display_stats(stats, =false, caption="stomp call summary") = @options unless if [:verbose] puts("\n---- #{caption} ----") if stats[:discovered] puts(" Nodes: #{stats[:discovered]} / #{stats[:responses]}") else puts(" Nodes: #{stats[:responses]}") end printf(" Start Time: %s\n", Time.at(stats[:starttime])) printf(" Discovery Time: %.2fms\n", stats[:discoverytime] * 1000) printf(" Agent Time: %.2fms\n", stats[:blocktime] * 1000) printf(" Total Time: %.2fms\n", stats[:totaltime] * 1000) else if stats[:discovered] printf("\nFinished processing %d / %d hosts in %.2f ms\n\n", stats[:responses], stats[:discovered], stats[:blocktime] * 1000) else printf("\nFinished processing %d hosts in %.2f ms\n\n", stats[:responses], stats[:blocktime] * 1000) end end if stats[:noresponsefrom].size > 0 puts("\nNo response from:\n") stats[:noresponsefrom].each do |c| puts if c % 4 == 1 printf("%30s", c) end puts end end |
#receive(requestid = nil) ⇒ Object
Blocking call that waits for ever for a message to arrive.
If you give it a requestid this means you’ve previously send a request with that ID and now you just want replies that matches that id, in that case the current connection will just ignore all messages not directed at it and keep waiting for more till it finds a matching message.
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/mcollective/client.rb', line 84 def receive(requestid = nil) reply = nil begin reply = @connection.receive reply.type = :reply reply.expected_msgid = requestid reply.decode! reply.payload[:senderid] = Digest::MD5.hexdigest(reply.payload[:senderid]) if ENV.include?("MCOLLECTIVE_ANON") raise(MsgDoesNotMatchRequestID, "Message reqid #{requestid} does not match our reqid #{reply.requestid}") unless reply.requestid == requestid rescue SecurityValidationFailed => e Log.warn("Ignoring a message that did not pass security validations") retry rescue MsgDoesNotMatchRequestID => e Log.debug("Ignoring a message for some other client") retry end reply end |
#req(body, agent = nil, options = false, waitfor = 0) ⇒ Object
Send a request, performs the passed block for each response
times = req(“status”, “mcollectived”, options, client) {|resp|
pp resp
}
It returns a hash of times and timeouts for discovery and total run is taken from the options hash which in turn is generally built using MCollective::Optionparser
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 |
# File 'lib/mcollective/client.rb', line 149 def req(body, agent=nil, =false, waitfor=0) if body.is_a?(Message) agent = body.agent = body. waitfor = body.discovered_hosts.size || 0 end stat = {:starttime => Time.now.to_f, :discoverytime => 0, :blocktime => 0, :totaltime => 0} = @options unless STDOUT.sync = true hosts_responded = 0 begin Timeout.timeout([:timeout]) do reqid = sendreq(body, agent, [:filter]) loop do resp = receive(reqid) hosts_responded += 1 yield(resp.payload) break if (waitfor != 0 && hosts_responded >= waitfor) end end rescue Interrupt => e rescue Timeout::Error => e ensure unsubscribe(agent, :reply) end stat[:totaltime] = Time.now.to_f - stat[:starttime] stat[:blocktime] = stat[:totaltime] - stat[:discoverytime] stat[:responses] = hosts_responded stat[:noresponsefrom] = [] @stats = stat return stat end |
#sendreq(msg, agent, filter = {}) ⇒ Object
Sends a request and returns the generated request id, doesn’t wait for responses and doesn’t execute any passed in code blocks for responses
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/mcollective/client.rb', line 38 def sendreq(msg, agent, filter = {}) if msg.is_a?(Message) request = msg agent = request.agent else ttl = @options[:ttl] || @config.ttl request = Message.new(msg, nil, {:agent => agent, :type => :request, :collective => collective, :filter => filter, :ttl => ttl}) request.reply_to = @options[:reply_to] if @options[:reply_to] end request.encode! Log.debug("Sending request #{request.requestid} to the #{request.agent} agent with ttl #{request.ttl} in collective #{request.collective}") subscribe(agent, :reply) request.publish request.requestid end |
#subscribe(agent, type) ⇒ Object
59 60 61 62 63 64 65 66 67 |
# File 'lib/mcollective/client.rb', line 59 def subscribe(agent, type) unless @subscriptions.include?(agent) subscription = Util.make_subscriptions(agent, type, collective) Log.debug("Subscribing to #{type} target for agent #{agent}") Util.subscribe(subscription) @subscriptions[agent] = 1 end end |
#unsubscribe(agent, type) ⇒ Object
69 70 71 72 73 74 75 76 77 |
# File 'lib/mcollective/client.rb', line 69 def unsubscribe(agent, type) if @subscriptions.include?(agent) subscription = Util.make_subscriptions(agent, type, collective) Log.debug("Unsubscribing #{type} target for #{agent}") Util.unsubscribe(subscription) @subscriptions.delete(agent) end end |