Class: MCollective::Client
- Inherits:
-
Object
- Object
- MCollective::Client
- Defined in:
- lib/mcollective/client.rb
Overview
Helpers for writing clients that can talk to agents, do discovery and so forth
Constant Summary collapse
- @@request_sequence =
rubocop:disable Style/ClassVars
0
Instance Attribute Summary collapse
-
#connection_timeout ⇒ Object
Returns the value of attribute connection_timeout.
-
#discoverer ⇒ Object
Returns the value of attribute discoverer.
-
#options ⇒ Object
Returns the value of attribute options.
-
#stats ⇒ Object
Returns the value of attribute stats.
Class Method Summary collapse
Instance Method Summary collapse
-
#collective ⇒ Object
Returns the configured main collective if no specific collective is specified as options.
- #createreq(msg, agent, filter = {}) ⇒ Object
-
#disconnect ⇒ Object
Disconnects cleanly from the middleware.
-
#discover(filter, timeout, limit = 0) ⇒ Object
Performs a discovery of nodes matching the filter passed returns an array of nodes.
- #discovered_req(body, agent, options = false) ⇒ Object
-
#display_stats(stats, options = false, caption = "stomp call summary") ⇒ Object
Prints out the stats returns from req and discovered_req in a nice way.
-
#initialize(options) ⇒ Client
constructor
A new instance of Client.
- #publish(request) ⇒ Object
-
#receive(requestid = nil) ⇒ Object
Blocking call that waits for ever for a message to arrive.
-
#req(body, agent = nil, options = false, waitfor = [], &block) ⇒ Object
Send a request, performs the passed block for each response.
-
#sendreq(msg, agent, filter = {}) ⇒ Object
Sends a request and returns the generated request id, doesn’t wait for responses and doesn’t execute any passed in code blocks for responses.
-
#start_publisher(request, publish_timeout) ⇒ Object
Starts the request publishing routine.
-
#start_receiver(requestid, waitfor, timeout, &block) ⇒ Object
Starts the response receiver routine Expected to return the amount of received responses.
- #subscribe(agent, type) ⇒ Object
-
#threaded_req(request, publish_timeout, timeout, waitfor, &block) ⇒ Object
Starts the client receiver and publisher in threads.
- #unsubscribe(agent, type) ⇒ Object
-
#unthreaded_req(request, publish_timeout, timeout, waitfor, &block) ⇒ Object
Starts the client receiver and publisher unthreaded.
- #update_stat(stat, hosts_responded, requestid) ⇒ Object
Constructor Details
#initialize(options) ⇒ Client
Returns a new instance of Client.
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/mcollective/client.rb', line 6 def initialize() @config = Config.instance @options = nil case when String # String is the path to a config file @config.loadconfig() unless @config.configured when Hash @config.loadconfig([:config]) unless @config.configured @options = @connection_timeout = [:connection_timeout] @config.federations = [:federations] if [:federations] else raise "Invalid parameter passed to Client constructor. Valid types are Hash or String" end @connection_timeout ||= @config.connection_timeout @connection = PluginManager["connector_plugin"] @security = PluginManager["security_plugin"] @security.initiated_by = :client @subscriptions = {} @discoverer = Discovery.new(self) # Time box the connection if a timeout has been specified # connection_timeout defaults to nil which means it will try forever if # not specified begin Timeout.timeout(@connection_timeout, ClientTimeoutError) do @connection.connect end rescue ClientTimeoutError => e Log.error("Timeout occured while trying to connect to middleware") raise e end end |
Instance Attribute Details
#connection_timeout ⇒ Object
Returns the value of attribute connection_timeout.
4 5 6 |
# File 'lib/mcollective/client.rb', line 4 def connection_timeout @connection_timeout end |
#discoverer ⇒ Object
Returns the value of attribute discoverer.
4 5 6 |
# File 'lib/mcollective/client.rb', line 4 def discoverer @discoverer end |
#options ⇒ Object
Returns the value of attribute options.
4 5 6 |
# File 'lib/mcollective/client.rb', line 4 def @options end |
#stats ⇒ Object
Returns the value of attribute stats.
4 5 6 |
# File 'lib/mcollective/client.rb', line 4 def stats @stats end |
Class Method Details
.request_sequence ⇒ Object
52 53 54 |
# File 'lib/mcollective/client.rb', line 52 def self.request_sequence @@request_sequence end |
.reset_request_sequence ⇒ Object
48 49 50 |
# File 'lib/mcollective/client.rb', line 48 def self.reset_request_sequence @@request_sequence = 0 # rubocop:disable Style/ClassVars end |
Instance Method Details
#collective ⇒ Object
Returns the configured main collective if no specific collective is specified as options
58 59 60 61 62 63 64 |
# File 'lib/mcollective/client.rb', line 58 def collective if @options[:collective].nil? @config.main_collective else @options[:collective] end end |
#createreq(msg, agent, filter = {}) ⇒ Object
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/mcollective/client.rb', line 80 def createreq(msg, agent, filter={}) if msg.is_a?(Message) request = msg agent = request.agent else ttl = @options[:ttl] || @config.ttl request = Message.new(msg, nil, :agent => agent, :type => :request, :collective => collective, :filter => filter, :ttl => ttl) request.reply_to = @options[:reply_to] if @options[:reply_to] end @@request_sequence += 1 # rubocop:disable Style/ClassVars request.encode! subscribe(agent, :reply) unless request.reply_to request end |
#disconnect ⇒ Object
Disconnects cleanly from the middleware
67 68 69 70 |
# File 'lib/mcollective/client.rb', line 67 def disconnect Log.debug("Disconnecting from the middleware") @connection.disconnect end |
#discover(filter, timeout, limit = 0) ⇒ Object
Performs a discovery of nodes matching the filter passed returns an array of nodes
An integer limit can be supplied this will have the effect of the discovery being cancelled soon as it reached the requested limit of hosts
153 154 155 |
# File 'lib/mcollective/client.rb', line 153 def discover(filter, timeout, limit=0) @discoverer.discover(filter.merge("collective" => collective), timeout, limit, self) end |
#discovered_req(body, agent, options = false) ⇒ Object
304 305 306 |
# File 'lib/mcollective/client.rb', line 304 def discovered_req(body, agent, =false) raise "Client#discovered_req has been removed, please port your agent and client to the SimpleRPC framework" end |
#display_stats(stats, options = false, caption = "stomp call summary") ⇒ Object
Prints out the stats returns from req and discovered_req in a nice way
309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 |
# File 'lib/mcollective/client.rb', line 309 def display_stats(stats, =false, ="stomp call summary") ||= @options if [:verbose] puts("\n---- #{} ----") if stats[:discovered] puts(" Nodes: #{stats[:discovered]} / #{stats[:responses]}") else puts(" Nodes: #{stats[:responses]}") end printf(" Start Time: %s\n", Time.at(stats[:starttime])) printf(" Discovery Time: %.2fms\n", stats[:discoverytime] * 1000) printf(" Agent Time: %.2fms\n", stats[:blocktime] * 1000) printf(" Total Time: %.2fms\n", stats[:totaltime] * 1000) elsif stats[:discovered] printf("\nFinished processing %d / %d hosts in %.2f ms\n\n", stats[:responses], stats[:discovered], stats[:blocktime] * 1000) else printf("\nFinished processing %d hosts in %.2f ms\n\n", stats[:responses], stats[:blocktime] * 1000) end unless stats[:noresponsefrom].empty? puts("\nNo response from:\n") stats[:noresponsefrom].each do |c| puts if c % 4 == 1 printf("%30s", c) end puts end unless stats[:unexpectedresponsefrom].empty? puts("\nUnexpected response from:\n") stats[:unexpectedresponsefrom].each do |c| puts if c % 4 == 1 printf("%30s", c) end puts end end |
#publish(request) ⇒ Object
237 238 239 240 |
# File 'lib/mcollective/client.rb', line 237 def publish(request) Log.info("Sending request #{request.requestid} for agent '#{request.agent}' with ttl #{request.ttl} in collective '#{request.collective}'") request.publish end |
#receive(requestid = nil) ⇒ Object
Blocking call that waits for ever for a message to arrive.
If you give it a requestid this means you’ve previously send a request with that ID and now you just want replies that matches that id, in that case the current connection will just ignore all messages not directed at it and keep waiting for more till it finds a matching message.
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
# File 'lib/mcollective/client.rb', line 123 def receive(requestid=nil) reply = nil begin reply = @connection.receive reply.type = :reply reply.expected_msgid = requestid reply.decode! raise(MsgDoesNotMatchRequestID, "Message reqid #{reply.requestid} does not match our reqid #{requestid}") unless reply.requestid == requestid Log.debug("Received reply to #{reply.requestid} from #{reply.payload[:senderid]}") rescue SecurityValidationFailed => e Log.warn("Ignoring a message that did not pass security validations") retry rescue MsgDoesNotMatchRequestID => e Log.debug("Ignoring a message for some other client : #{e.}") retry end reply end |
#req(body, agent = nil, options = false, waitfor = [], &block) ⇒ Object
Send a request, performs the passed block for each response
times = req(“status”, “mcollectived”, options, client) {|resp|
pp resp
}
It returns a hash of times and timeouts for discovery and total run is taken from the options hash which in turn is generally built using MCollective::Optionparser
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 |
# File 'lib/mcollective/client.rb', line 165 def req(body, agent=nil, =false, waitfor=[], &block) if body.is_a?(Message) agent = body.agent waitfor = body.discovered_hosts || [] @options = body. end @options = if threaded = @options[:threaded] timeout = @discoverer.discovery_timeout(@options[:timeout], @options[:filter]) request = createreq(body, agent, @options[:filter]) publish_timeout = @options[:publish_timeout] || @config.publish_timeout stat = {:starttime => Time.now.to_f, :discoverytime => 0, :blocktime => 0, :totaltime => 0} $stdout.sync = true hosts_responded = 0 begin if threaded hosts_responded = threaded_req(request, publish_timeout, timeout, waitfor, &block) else hosts_responded = unthreaded_req(request, publish_timeout, timeout, waitfor, &block) end rescue Interrupt # rubocop:disable Lint/SuppressedException ensure unsubscribe(agent, :reply) end update_stat(stat, hosts_responded, request.requestid) end |
#sendreq(msg, agent, filter = {}) ⇒ Object
Sends a request and returns the generated request id, doesn’t wait for responses and doesn’t execute any passed in code blocks for responses
74 75 76 77 78 |
# File 'lib/mcollective/client.rb', line 74 def sendreq(msg, agent, filter={}) request = createreq(msg, agent, filter) publish(request) request.requestid end |
#start_publisher(request, publish_timeout) ⇒ Object
Starts the request publishing routine
226 227 228 229 230 231 232 233 234 235 |
# File 'lib/mcollective/client.rb', line 226 def start_publisher(request, publish_timeout) Log.debug("Starting publishing with publish timeout of #{publish_timeout}") begin Timeout.timeout(publish_timeout) do publish(request) end rescue Timeout::Error Log.warn("Could not publish all messages. Publishing timed out.") end end |
#start_receiver(requestid, waitfor, timeout, &block) ⇒ Object
Starts the response receiver routine Expected to return the amount of received responses.
244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 |
# File 'lib/mcollective/client.rb', line 244 def start_receiver(requestid, waitfor, timeout, &block) Log.debug("Starting response receiver with timeout of #{timeout}") hosts_responded = 0 if waitfor.is_a?(Array) unfinished = Hash.new(0) waitfor.each {|w| unfinished[w] += 1} else unfinished = [] end begin Timeout.timeout(timeout) do loop do resp = receive(requestid) if block.arity == 2 yield resp.payload, resp else yield resp.payload end hosts_responded += 1 if waitfor.is_a?(Array) sender = resp.payload[:senderid] if unfinished[sender] <= 1 unfinished.delete(sender) else unfinished[sender] -= 1 end break if !waitfor.empty? && unfinished.empty? else break unless waitfor == 0 || hosts_responded < waitfor end end end rescue Timeout::Error if waitfor.is_a?(Array) Log.warn("Could not receive all responses. Did not receive responses from #{unfinished.keys.join(', ')}") unless unfinished.empty? elsif waitfor > hosts_responded Log.warn("Could not receive all responses. Expected : #{waitfor}. Received : #{hosts_responded}") end end hosts_responded end |
#subscribe(agent, type) ⇒ Object
97 98 99 100 101 102 103 104 105 |
# File 'lib/mcollective/client.rb', line 97 def subscribe(agent, type) unless @subscriptions.include?(agent) subscription = Util.make_subscriptions(agent, type, collective) Log.debug("Subscribing to #{type} target for agent #{agent}") Util.subscribe(subscription) @subscriptions[agent] = 1 end end |
#threaded_req(request, publish_timeout, timeout, waitfor, &block) ⇒ Object
Starts the client receiver and publisher in threads. This is activated when the ‘threader_client’ configuration option is set.
205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 |
# File 'lib/mcollective/client.rb', line 205 def threaded_req(request, publish_timeout, timeout, waitfor, &block) Log.debug("Starting threaded client") Thread.new do start_publisher(request, publish_timeout) end # When the client is threaded we add the publishing timeout to # the agent timeout so that the receiver doesn't time out before # publishing has finished in cases where publish_timeout >= timeout. total_timeout = publish_timeout + timeout hosts_responded = 0 receiver = Thread.new do hosts_responded = start_receiver(request.requestid, waitfor, total_timeout, &block) end receiver.join hosts_responded end |
#unsubscribe(agent, type) ⇒ Object
107 108 109 110 111 112 113 114 115 |
# File 'lib/mcollective/client.rb', line 107 def unsubscribe(agent, type) if @subscriptions.include?(agent) subscription = Util.make_subscriptions(agent, type, collective) Log.debug("Unsubscribing #{type} target for #{agent}") Util.unsubscribe(subscription) @subscriptions.delete(agent) end end |
#unthreaded_req(request, publish_timeout, timeout, waitfor, &block) ⇒ Object
Starts the client receiver and publisher unthreaded. This is the default client behaviour.
197 198 199 200 |
# File 'lib/mcollective/client.rb', line 197 def unthreaded_req(request, publish_timeout, timeout, waitfor, &block) start_publisher(request, publish_timeout) start_receiver(request.requestid, waitfor, timeout, &block) end |
#update_stat(stat, hosts_responded, requestid) ⇒ Object
293 294 295 296 297 298 299 300 301 302 |
# File 'lib/mcollective/client.rb', line 293 def update_stat(stat, hosts_responded, requestid) stat[:totaltime] = Time.now.to_f - stat[:starttime] stat[:blocktime] = stat[:totaltime] - stat[:discoverytime] stat[:responses] = hosts_responded stat[:noresponsefrom] = [] stat[:unexpectedresponsefrom] = [] stat[:requestid] = requestid @stats = stat end |