Class: MCollective::RPC::Client
- Inherits:
-
Object
- Object
- MCollective::RPC::Client
- Defined in:
- lib/mcollective/rpc/client.rb
Overview
The main component of the Simple RPC client system, this wraps around MCollective::Client and just brings in a lot of convention and standard approached.
Constant Summary collapse
- @@initial_options =
nil
Instance Attribute Summary collapse
-
#agent ⇒ Object
readonly
Returns the value of attribute agent.
-
#batch_mode ⇒ Object
readonly
Returns the value of attribute batch_mode.
-
#batch_size ⇒ Object
Returns the value of attribute batch_size.
-
#batch_sleep_time ⇒ Object
Returns the value of attribute batch_sleep_time.
-
#client ⇒ Object
readonly
Returns the value of attribute client.
-
#config ⇒ Object
Returns the value of attribute config.
-
#ddl ⇒ Object
readonly
Returns the value of attribute ddl.
-
#default_discovery_method ⇒ Object
readonly
Returns the value of attribute default_discovery_method.
-
#discovery_method ⇒ Object
Returns the value of attribute discovery_method.
-
#discovery_options ⇒ Object
Returns the value of attribute discovery_options.
-
#filter ⇒ Object
Returns the value of attribute filter.
-
#limit_method ⇒ Object
Returns the value of attribute limit_method.
-
#limit_seed ⇒ Object
readonly
Returns the value of attribute limit_seed.
-
#limit_targets ⇒ Object
Returns the value of attribute limit_targets.
-
#output_format ⇒ Object
readonly
Returns the value of attribute output_format.
-
#progress ⇒ Object
Returns the value of attribute progress.
-
#reply_to ⇒ Object
Returns the value of attribute reply_to.
-
#stats ⇒ Object
readonly
Returns the value of attribute stats.
-
#timeout ⇒ Object
Returns the value of attribute timeout.
-
#ttl ⇒ Object
Returns the value of attribute ttl.
-
#verbose ⇒ Object
Returns the value of attribute verbose.
Instance Method Summary collapse
-
#agent_filter(agent) ⇒ Object
Sets the agent filter.
- #aggregate_reply(reply, aggregate) ⇒ Object
-
#call_agent(action, args, opts, disc = :auto, &block) ⇒ Object
Handles traditional calls to the remote agents with full stats blocks, non blocks and everything else supported.
-
#call_agent_batched(action, args, opts, batch_size, sleep_time, &block) ⇒ Object
Calls an agent in a way very similar to call_agent but it supports batching the queries to the network.
-
#class_filter(klass) ⇒ Object
Sets the class filter.
-
#collective=(c) ⇒ Object
Sets the collective we are communicating with.
-
#compound_filter(filter) ⇒ Object
Set a compound filter.
-
#custom_request(action, args, expected_agents, filter = {}, &block) ⇒ Object
Constructs custom requests with custom filters and discovery data the idea is that this would be used in web applications where you might be using a cached copy of data provided by a registration agent to figure out on your own what nodes will be responding and what your filter would be.
-
#disconnect ⇒ Object
Disconnects cleanly from the middleware.
-
#discover(flags = {}) ⇒ Object
Does discovery based on the filters set, if a discovery was previously done return that else do a new discovery.
- #discovery_timeout ⇒ Object
- #discovery_timeout=(timeout) ⇒ Object
-
#fact_filter(fact, value = nil, operator = "=") ⇒ Object
Sets the fact filter.
-
#fire_and_forget_request(action, args, filter = nil) ⇒ Object
for requests that do not care for results just return the request id and don’t do any of the response processing.
-
#help(template) ⇒ Object
Returns help for an agent if a DDL was found.
-
#identity_filter(identity) ⇒ Object
Sets the identity filter.
-
#identity_filter_discovery_optimization ⇒ Object
if an identity filter is supplied and it is all strings no regex we can use that as discovery data, technically the identity filter is then redundant if we are in direct addressing mode and we could empty it out but this use case should only really be for a few -I’s on the CLI.
-
#initialize(agent, flags = {}) ⇒ Client
constructor
Creates a stub for a remote agent, you can pass in an options array in the flags which will then be used else it will just create a default options array with filtering enabled based on the standard command line use.
- #load_aggregate_functions(action, ddl) ⇒ Object
-
#method_missing(method_name, *args, &block) ⇒ Object
Magic handler to invoke remote methods.
-
#new_request(action, data) ⇒ Object
Creates a suitable request hash for the SimpleRPC agent.
-
#options ⇒ Object
Provides a normal options hash like you would get from Optionparser.
-
#pick_nodes_from_discovered(count) ⇒ Object
Pick a number of nodes from the discovered nodes.
-
#process_results_with_block(action, resp, block, aggregate) ⇒ Object
process client requests by calling a block on each result in this mode we do not do anything fancy with the result objects and we raise exceptions if there are problems with the data.
-
#process_results_without_block(resp, action, aggregate) ⇒ Object
Handles result sets that has no block associated, sets fails and ok in the stats object and return a hash of the response to send to the caller.
-
#reset ⇒ Object
Resets various internal parts of the class, most importantly it clears out the cached discovery.
-
#reset_filter ⇒ Object
Reet the filter to an empty one.
- #rpc_result_from_reply(agent, action, reply) ⇒ Object
-
#validate_request(action, args) ⇒ Object
For the provided arguments and action the input arguments get modified by supplying any defaults provided in the DDL for arguments that were not supplied in the request.
Constructor Details
#initialize(agent, flags = {}) ⇒ Client
Creates a stub for a remote agent, you can pass in an options array in the flags which will then be used else it will just create a default options array with filtering enabled based on the standard command line use.
rpc = RPC::Client.new("rpctest", :configfile => "client.cfg", :options => )
You typically would not call this directly you’d use MCollective::RPC#rpcclient instead which is a wrapper around this that can be used as a Mixin
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/mcollective/rpc/client.rb', line 20 def initialize(agent, flags = {}) if flags.include?(:options) = flags[:options] elsif @@initial_options = Marshal.load(@@initial_options) else oparser = MCollective::Optionparser.new({ :verbose => false, :progress_bar => true, :mcollective_limit_targets => false, :batch_size => nil, :batch_sleep_time => 1 }, "filter") = oparser.parse do |parser, opts| if block_given? yield(parser, opts) end Helpers.(parser, opts) end @@initial_options = Marshal.dump() end @initial_options = @config = [:config] @client = MCollective::Client.new(@initial_options) @stats = Stats.new @agent = agent @timeout = [:timeout] || 5 @verbose = [:verbose] @filter = [:filter] || Util.empty_filter @discovered_agents = nil @progress = [:progress_bar] @limit_targets = [:mcollective_limit_targets] @limit_method = Config.instance.rpclimitmethod @limit_seed = [:limit_seed] || nil @output_format = [:output_format] || :console @force_direct_request = false @reply_to = [:reply_to] @discovery_method = [:discovery_method] if !@discovery_method @discovery_method = Config.instance.default_discovery_method @default_discovery_method = true else @default_discovery_method = false end @discovery_options = [:discovery_options] || [] @force_display_mode = [:force_display_mode] || false @batch_size = [:batch_size] || 0 @batch_sleep_time = Float([:batch_sleep_time] || 1) @batch_mode = determine_batch_mode(@batch_size) agent_filter agent @discovery_timeout = @initial_options.fetch(:disctimeout, nil) || Config.instance.discovery_timeout @collective = @client.collective @ttl = [:ttl] || Config.instance.ttl @publish_timeout = [:publish_timeout] || Config.instance.publish_timeout @threaded = [:threaded] || Config.instance.threaded # if we can find a DDL for the service override # the timeout of the client so we always magically # wait appropriate amounts of time. # # We add the discovery timeout to the ddl supplied # timeout as the discovery timeout tends to be tuned # for local network conditions and fact source speed # which would other wise not be accounted for and # some results might get missed. # # We do this only if the timeout is the default 5 # seconds, so that users cli overrides will still # get applied # # DDLs are required, failure to find a DDL is fatal @ddl = DDL.new(agent) @stats.ddl = @ddl @timeout = @ddl.[:timeout] + discovery_timeout if @timeout == 5 # allows stderr and stdout to be overridden for testing # but also for web apps that might not want a bunch of stuff # generated to actual file handles if [:stderr] @stderr = [:stderr] else @stderr = STDERR @stderr.sync = true end if [:stdout] @stdout = [:stdout] else @stdout = STDOUT @stdout.sync = true end end |
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
#method_missing(method_name, *args, &block) ⇒ Object
Magic handler to invoke remote methods
Once the stub is created using the constructor or the RPC#rpcclient helper you can call remote actions easily:
ret = rpc.echo(:msg => "hello world")
This will call the ‘echo’ action of the ‘rpctest’ agent and return the result as an array, the array will be a simplified result set from the usual full MCollective::Client#req with additional error codes and error text:
:sender => "remote.box.com",
:statuscode => 0,
:statusmsg => "OK",
:data => "hello world"
If :statuscode is 0 then everything went find, if it’s 1 then you supplied the correct arguments etc but the request could not be completed, you’ll find a human parsable reason in :statusmsg then.
Codes 2 to 5 maps directly to UnknownRPCAction, MissingRPCData, InvalidRPCData and UnknownRPCError see below for a description of those, in each case :statusmsg would be the reason for failure.
To get access to the full result of the MCollective::Client#req calls you can pass in a block:
rpc.echo(:msg => "hello world") do |resp|
pp resp
end
In this case resp will the result from MCollective::Client#req. Instead of returning simple text and codes as above you’ll also need to handle the following exceptions:
UnknownRPCAction - There is no matching action on the agent MissingRPCData - You did not supply all the needed parameters for the action InvalidRPCData - The data you did supply did not pass validation UnknownRPCError - Some other error prevented the agent from running
During calls a progress indicator will be shown of how many results we’ve received against how many nodes were discovered, you can disable this by setting progress to false:
rpc.progress = false
This supports a 2nd mode where it will send the SimpleRPC request and never handle the responses. It’s a bit like UDP, it sends the request with the filter attached and you only get back the requestid, you have no indication about results.
You can invoke this using:
puts rpc.echo(:process_results => false)
This will output just the request id.
Batched processing is supported:
printrpc rpc.ping(:batch_size => 5)
This will do everything exactly as normal but communicate to only 5 agents at a time
235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 |
# File 'lib/mcollective/rpc/client.rb', line 235 def method_missing(method_name, *args, &block) # set args to an empty hash if nothings given args = args[0] args = {} if args.nil? action = method_name.to_s @stats.reset validate_request(action, args) # TODO(ploubser): The logic here seems poor. It implies that it is valid to # pass arguments where batch_mode is set to false and batch_mode > 0. # If this is the case we completely ignore the supplied value of batch_mode # and do our own thing. # if a global batch size is set just use that else set it # in the case that it was passed as an argument batch_mode = args.include?(:batch_size) || @batch_mode batch_size = args.delete(:batch_size) || @batch_size batch_sleep_time = args.delete(:batch_sleep_time) || @batch_sleep_time # if we were given a batch_size argument thats 0 and batch_mode was # determined to be on via global options etc this will allow a batch_size # of 0 to disable or batch_mode for this call only batch_mode = determine_batch_mode(batch_size) # Handle single target requests by doing discovery and picking # a random node. Then do a custom request specifying a filter # that will only match the one node. if @limit_targets target_nodes = pick_nodes_from_discovered(@limit_targets) Log.debug("Picked #{target_nodes.join(',')} as limited target(s)") custom_request(action, args, target_nodes, {"identity" => /^(#{target_nodes.join('|')})$/}, &block) elsif batch_mode call_agent_batched(action, args, , batch_size, batch_sleep_time, &block) else call_agent(action, args, , :auto, &block) end end |
Instance Attribute Details
#agent ⇒ Object (readonly)
Returns the value of attribute agent.
7 8 9 |
# File 'lib/mcollective/rpc/client.rb', line 7 def agent @agent end |
#batch_mode ⇒ Object (readonly)
Returns the value of attribute batch_mode.
7 8 9 |
# File 'lib/mcollective/rpc/client.rb', line 7 def batch_mode @batch_mode end |
#batch_size ⇒ Object
Returns the value of attribute batch_size.
7 8 9 |
# File 'lib/mcollective/rpc/client.rb', line 7 def batch_size @batch_size end |
#batch_sleep_time ⇒ Object
Returns the value of attribute batch_sleep_time.
7 8 9 |
# File 'lib/mcollective/rpc/client.rb', line 7 def batch_sleep_time @batch_sleep_time end |
#client ⇒ Object (readonly)
Returns the value of attribute client.
7 8 9 |
# File 'lib/mcollective/rpc/client.rb', line 7 def client @client end |
#config ⇒ Object
Returns the value of attribute config.
6 7 8 |
# File 'lib/mcollective/rpc/client.rb', line 6 def config @config end |
#ddl ⇒ Object (readonly)
Returns the value of attribute ddl.
7 8 9 |
# File 'lib/mcollective/rpc/client.rb', line 7 def ddl @ddl end |
#default_discovery_method ⇒ Object (readonly)
Returns the value of attribute default_discovery_method.
8 9 10 |
# File 'lib/mcollective/rpc/client.rb', line 8 def default_discovery_method @default_discovery_method end |
#discovery_method ⇒ Object
Returns the value of attribute discovery_method.
8 9 10 |
# File 'lib/mcollective/rpc/client.rb', line 8 def discovery_method @discovery_method end |
#discovery_options ⇒ Object
Returns the value of attribute discovery_options.
8 9 10 |
# File 'lib/mcollective/rpc/client.rb', line 8 def @discovery_options end |
#filter ⇒ Object
Returns the value of attribute filter.
6 7 8 |
# File 'lib/mcollective/rpc/client.rb', line 6 def filter @filter end |
#limit_method ⇒ Object
Returns the value of attribute limit_method.
7 8 9 |
# File 'lib/mcollective/rpc/client.rb', line 7 def limit_method @limit_method end |
#limit_seed ⇒ Object (readonly)
Returns the value of attribute limit_seed.
8 9 10 |
# File 'lib/mcollective/rpc/client.rb', line 8 def limit_seed @limit_seed end |
#limit_targets ⇒ Object
Returns the value of attribute limit_targets.
7 8 9 |
# File 'lib/mcollective/rpc/client.rb', line 7 def limit_targets @limit_targets end |
#output_format ⇒ Object (readonly)
Returns the value of attribute output_format.
7 8 9 |
# File 'lib/mcollective/rpc/client.rb', line 7 def output_format @output_format end |
#progress ⇒ Object
Returns the value of attribute progress.
6 7 8 |
# File 'lib/mcollective/rpc/client.rb', line 6 def progress @progress end |
#reply_to ⇒ Object
Returns the value of attribute reply_to.
6 7 8 |
# File 'lib/mcollective/rpc/client.rb', line 6 def reply_to @reply_to end |
#stats ⇒ Object (readonly)
Returns the value of attribute stats.
7 8 9 |
# File 'lib/mcollective/rpc/client.rb', line 7 def stats @stats end |
#timeout ⇒ Object
Returns the value of attribute timeout.
6 7 8 |
# File 'lib/mcollective/rpc/client.rb', line 6 def timeout @timeout end |
#ttl ⇒ Object
Returns the value of attribute ttl.
6 7 8 |
# File 'lib/mcollective/rpc/client.rb', line 6 def ttl @ttl end |
#verbose ⇒ Object
Returns the value of attribute verbose.
6 7 8 |
# File 'lib/mcollective/rpc/client.rb', line 6 def verbose @verbose end |
Instance Method Details
#agent_filter(agent) ⇒ Object
Sets the agent filter
431 432 433 434 435 |
# File 'lib/mcollective/rpc/client.rb', line 431 def agent_filter(agent) @filter["agent"] = @filter["agent"] | [agent] @filter["agent"].compact! reset end |
#aggregate_reply(reply, aggregate) ⇒ Object
707 708 709 710 711 712 713 714 715 |
# File 'lib/mcollective/rpc/client.rb', line 707 def aggregate_reply(reply, aggregate) return nil unless aggregate aggregate.call_functions(reply) return aggregate rescue Exception => e Log.error("Failed to calculate aggregate summaries for reply from %s, calculating summaries disabled: %s: %s (%s)" % [reply[:senderid], e.backtrace.first, e.to_s, e.class]) return nil end |
#call_agent(action, args, opts, disc = :auto, &block) ⇒ Object
Handles traditional calls to the remote agents with full stats blocks, non blocks and everything else supported.
Other methods of calling the nodes can reuse this code by for example specifying custom options and discovery data
883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 |
# File 'lib/mcollective/rpc/client.rb', line 883 def call_agent(action, args, opts, disc=:auto, &block) # Handle fire and forget requests and make sure # the :process_results value is set appropriately # # specific reply-to requests should be treated like # fire and forget since the client will never get # the responses if args[:process_results] == false || @reply_to return fire_and_forget_request(action, args) else args[:process_results] = true end # Do discovery when no specific discovery array is given # # If an array is given set the force_direct_request hint that # will tell the message object to be a direct request one if disc == :auto discovered = discover else @force_direct_request = true if Config.instance.direct_addressing discovered = disc end req = new_request(action.to_s, args) = Message.new(req, nil, {:agent => @agent, :type => :request, :collective => @collective, :filter => opts[:filter], :options => opts}) .discovered_hosts = discovered.clone results = [] respcount = 0 if discovered.size > 0 .type = :direct_request if @force_direct_request if @progress && !block_given? twirl = Progress.new @stdout.puts @stdout.print twirl.twirl(respcount, discovered.size) end aggregate = load_aggregate_functions(action, @ddl) @client.req() do |resp| respcount += 1 if block_given? aggregate = process_results_with_block(action, resp, block, aggregate) else @stdout.print twirl.twirl(respcount, discovered.size) if @progress result, aggregate = process_results_without_block(resp, action, aggregate) results << result end end if @initial_options[:sort] results.sort! end @stats.aggregate_summary = aggregate.summarize if aggregate @stats.aggregate_failures = aggregate.failed if aggregate @stats.client_stats = @client.stats else @stderr.print("\nNo request sent, we did not discover any nodes.") end @stats.finish_request RPC.stats(@stats) @stdout.print("\n\n") if @progress if block_given? return stats else return [results].flatten end end |
#call_agent_batched(action, args, opts, batch_size, sleep_time, &block) ⇒ Object
Calls an agent in a way very similar to call_agent but it supports batching the queries to the network.
The result sets, stats, block handling etc is all exactly like you would expect from normal call_agent.
This is used by method_missing and works only with direct addressing mode
779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 |
# File 'lib/mcollective/rpc/client.rb', line 779 def call_agent_batched(action, args, opts, batch_size, sleep_time, &block) raise "Batched requests requires direct addressing" unless Config.instance.direct_addressing raise "Cannot bypass result processing for batched requests" if args[:process_results] == false validate_batch_size(batch_size) sleep_time = Float(sleep_time) Log.debug("Calling #{agent}##{action} in batches of #{batch_size} with sleep time of #{sleep_time}") @force_direct_request = true discovered = discover results = [] respcount = 0 if discovered.size > 0 req = new_request(action.to_s, args) aggregate = load_aggregate_functions(action, @ddl) if @progress && !block_given? twirl = Progress.new @stdout.puts @stdout.print twirl.twirl(respcount, discovered.size) end if (batch_size =~ /^(\d+)%$/) # determine batch_size as a percentage of the discovered array's size batch_size = (discovered.size / 100.0 * Integer($1)).ceil else batch_size = Integer(batch_size) end @stats.requestid = nil processed_nodes = 0 discovered.in_groups_of(batch_size) do |hosts| = Message.new(req, nil, {:agent => @agent, :type => :direct_request, :collective => @collective, :filter => opts[:filter], :options => opts}) # first time round we let the Message object create a request id # we then re-use it for future requests to keep auditing sane etc @stats.requestid = .create_reqid unless @stats.requestid .requestid = @stats.requestid .discovered_hosts = hosts.clone.compact @client.req() do |resp| respcount += 1 if block_given? aggregate = process_results_with_block(action, resp, block, aggregate) else @stdout.print twirl.twirl(respcount, discovered.size) if @progress result, aggregate = process_results_without_block(resp, action, aggregate) results << result end end if @initial_options[:sort] results.sort! end @stats.noresponsefrom.concat @client.stats[:noresponsefrom] @stats.responses += @client.stats[:responses] @stats.blocktime += @client.stats[:blocktime] + sleep_time @stats.totaltime += @client.stats[:totaltime] @stats.discoverytime += @client.stats[:discoverytime] processed_nodes += hosts.length if (discovered.length > processed_nodes) sleep sleep_time end end @stats.aggregate_summary = aggregate.summarize if aggregate @stats.aggregate_failures = aggregate.failed if aggregate else @stderr.print("\nNo request sent, we did not discover any nodes.") end @stats.finish_request RPC.stats(@stats) @stdout.print("\n") if @progress if block_given? return stats else return [results].flatten end end |
#class_filter(klass) ⇒ Object
Sets the class filter
407 408 409 410 411 |
# File 'lib/mcollective/rpc/client.rb', line 407 def class_filter(klass) @filter["cf_class"] = @filter["cf_class"] | [klass] @filter["cf_class"].compact! reset end |
#collective=(c) ⇒ Object
Sets the collective we are communicating with
583 584 585 586 587 588 589 |
# File 'lib/mcollective/rpc/client.rb', line 583 def collective=(c) raise "Unknown collective #{c}" unless Config.instance.collectives.include?(c) @collective = c @client. = reset end |
#compound_filter(filter) ⇒ Object
Set a compound filter
445 446 447 448 |
# File 'lib/mcollective/rpc/client.rb', line 445 def compound_filter(filter) @filter["compound"] = @filter["compound"] | [Matcher.create_compound_callstack(filter)] reset end |
#custom_request(action, args, expected_agents, filter = {}, &block) ⇒ Object
Constructs custom requests with custom filters and discovery data the idea is that this would be used in web applications where you might be using a cached copy of data provided by a registration agent to figure out on your own what nodes will be responding and what your filter would be.
This will help you essentially short circuit the traditional cycle of:
mc discover / call / wait for discovered nodes
by doing discovery however you like, contructing a filter and a list of nodes you expect responses from.
Other than that it will work exactly like a normal call, blocks will behave the same way, stats will be handled the same way etcetc
If you just wanted to contact one machine for example with a client that already has other filter options setup you can do:
puppet.custom_request(“runonce”, {}, [“your.box.com”], => “your.box.com”)
This will do runonce action on just ‘your.box.com’, no discovery will be done and after receiving just one response it will stop waiting for responses
If direct_addressing is enabled in the config file you can provide an empty hash as a filter, this will force that request to be a directly addressed request which technically does not need filters. If you try to use this mode with direct addressing disabled an exception will be raise
305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 |
# File 'lib/mcollective/rpc/client.rb', line 305 def custom_request(action, args, expected_agents, filter = {}, &block) validate_request(action, args) if filter == {} && !Config.instance.direct_addressing raise "Attempted to do a filterless custom_request without direct_addressing enabled, preventing unexpected call to all nodes" end @stats.reset custom_filter = Util.empty_filter = .clone # merge the supplied filter with the standard empty one # we could just use the merge method but I want to be sure # we dont merge in stuff that isnt actually valid ["identity", "fact", "agent", "cf_class", "compound"].each do |ftype| if filter.include?(ftype) custom_filter[ftype] = [filter[ftype], custom_filter[ftype]].flatten end end # ensure that all filters at least restrict the call to the agent we're a proxy for custom_filter["agent"] << @agent unless custom_filter["agent"].include?(@agent) [:filter] = custom_filter # Fake out the stats discovery would have put there @stats.discovered_agents([expected_agents].flatten) # Handle fire and forget requests # # If a specific reply-to was set then from the client perspective this should # be a fire and forget request too since no response will ever reach us - it # will go to the reply-to destination if args[:process_results] == false || @reply_to return fire_and_forget_request(action, args, custom_filter) end # Now do a call pretty much exactly like in method_missing except with our own # options and discovery magic if block_given? call_agent(action, args, , [expected_agents].flatten) do |r| block.call(r) end else call_agent(action, args, , [expected_agents].flatten) end end |
#disconnect ⇒ Object
Disconnects cleanly from the middleware
125 126 127 |
# File 'lib/mcollective/rpc/client.rb', line 125 def disconnect @client.disconnect end |
#discover(flags = {}) ⇒ Object
Does discovery based on the filters set, if a discovery was previously done return that else do a new discovery.
Alternatively if identity filters are given and none of them are regular expressions then just use the provided data as discovered data, avoiding discovery
Discovery can be forced if direct_addressing is enabled by passing in an array of nodes with :nodes or JSON data like those produced by mcollective RPC JSON output using :json
Will show a message indicating its doing discovery if running verbose or if the :verbose flag is passed in.
Use reset to force a new discovery
477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 |
# File 'lib/mcollective/rpc/client.rb', line 477 def discover(flags={}) flags.keys.each do |key| raise "Unknown option #{key} passed to discover" unless [:verbose, :hosts, :nodes, :json].include?(key) end flags.include?(:verbose) ? verbose = flags[:verbose] : verbose = @verbose verbose = false unless @output_format == :console # flags[:nodes] and flags[:hosts] are the same thing, we should never have # allowed :hosts as that was inconsistent with the established terminology flags[:nodes] = flags.delete(:hosts) if flags.include?(:hosts) reset if flags[:nodes] || flags[:json] unless @discovered_agents # if either hosts or JSON is supplied try to figure out discovery data from there # if direct_addressing is not enabled this is a critical error as the user might # not have supplied filters so raise an exception if flags[:nodes] || flags[:json] raise "Can only supply discovery data if direct_addressing is enabled" unless Config.instance.direct_addressing hosts = [] if flags[:nodes] hosts = Helpers.extract_hosts_from_array(flags[:nodes]) elsif flags[:json] hosts = Helpers.extract_hosts_from_json(flags[:json]) end raise "Could not find any hosts in discovery data provided" if hosts.empty? @discovered_agents = hosts @force_direct_request = true else identity_filter_discovery_optimization end end # All else fails we do it the hard way using a traditional broadcast unless @discovered_agents @stats.time_discovery :start @client. = # if compound filters are used the only real option is to use the mc # discovery plugin since its the only capable of using data queries etc # and we do not want to degrade that experience just to allow compounds # on other discovery plugins the UX would be too bad raising complex sets # of errors etc. @client.discoverer.force_discovery_method_by_filter([:filter]) if verbose actual_timeout = @client.discoverer.discovery_timeout(discovery_timeout, [:filter]) if actual_timeout > 0 @stderr.print("Discovering hosts using the %s method for %d second(s) .... " % [@client.discoverer.discovery_method, actual_timeout]) else @stderr.print("Discovering hosts using the %s method .... " % [@client.discoverer.discovery_method]) end end # if the requested limit is a pure number and not a percent # and if we're configured to use the first found hosts as the # limit method then pass in the limit thus minimizing the amount # of work we do in the discover phase and speeding it up significantly filter = @filter.merge({'collective' => @collective}) if @limit_method == :first and @limit_targets.is_a?(Fixnum) @discovered_agents = @client.discover(filter, discovery_timeout, @limit_targets) else @discovered_agents = @client.discover(filter, discovery_timeout) end @stderr.puts(@discovered_agents.size) if verbose @force_direct_request = @client.discoverer.force_direct_mode? @stats.time_discovery :end end @stats.discovered_agents(@discovered_agents) RPC.discovered(@discovered_agents) @discovered_agents end |
#discovery_timeout ⇒ Object
353 354 355 356 |
# File 'lib/mcollective/rpc/client.rb', line 353 def discovery_timeout return @discovery_timeout if @discovery_timeout return @client.discoverer.ddl.[:timeout] end |
#discovery_timeout=(timeout) ⇒ Object
358 359 360 361 362 363 364 365 366 367 368 369 370 |
# File 'lib/mcollective/rpc/client.rb', line 358 def discovery_timeout=(timeout) @discovery_timeout = Float(timeout) # we calculate the overall timeout from the DDL of the agent and # the supplied discovery timeout unless someone specifically # specifies a timeout to the constructor # # But if we also then specifically set a discovery_timeout on the # agent that has to override the supplied timeout so we then # calculate a correct timeout based on DDL timeout and the # supplied discovery timeout @timeout = @ddl.[:timeout] + discovery_timeout end |
#fact_filter(fact, value = nil, operator = "=") ⇒ Object
Sets the fact filter
414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 |
# File 'lib/mcollective/rpc/client.rb', line 414 def fact_filter(fact, value=nil, operator="=") return if fact.nil? return if fact == false if value.nil? parsed = Util.parse_fact_string(fact) @filter["fact"] = @filter["fact"] | [parsed] unless parsed == false else parsed = Util.parse_fact_string("#{fact}#{operator}#{value}") @filter["fact"] = @filter["fact"] | [parsed] unless parsed == false end @filter["fact"].compact! reset end |
#fire_and_forget_request(action, args, filter = nil) ⇒ Object
for requests that do not care for results just return the request id and don’t do any of the response processing.
We send the :process_results flag with to the nodes so they can make decisions based on that.
Should only be called via method_missing
730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 |
# File 'lib/mcollective/rpc/client.rb', line 730 def fire_and_forget_request(action, args, filter=nil) validate_request(action, args) identity_filter_discovery_optimization req = new_request(action.to_s, args) filter = [:filter] unless filter = Message.new(req, nil, {:agent => @agent, :type => :request, :collective => @collective, :filter => filter, :options => }) .reply_to = @reply_to if @reply_to if @force_direct_request || @client.discoverer.force_direct_mode? .discovered_hosts = discover.clone .type = :direct_request end client.sendreq(, nil) end |
#help(template) ⇒ Object
Returns help for an agent if a DDL was found
130 131 132 |
# File 'lib/mcollective/rpc/client.rb', line 130 def help(template) @ddl.help(template) end |
#identity_filter(identity) ⇒ Object
Sets the identity filter
438 439 440 441 442 |
# File 'lib/mcollective/rpc/client.rb', line 438 def identity_filter(identity) @filter["identity"] = @filter["identity"] | [identity] @filter["identity"].compact! reset end |
#identity_filter_discovery_optimization ⇒ Object
if an identity filter is supplied and it is all strings no regex we can use that as discovery data, technically the identity filter is then redundant if we are in direct addressing mode and we could empty it out but this use case should only really be for a few -I’s on the CLI
For safety we leave the filter in place for now, that way we can support this enhancement also in broadcast mode.
This is only needed for the ‘mc’ discovery method, other methods might change the concept of identity to mean something else so we should pass the full identity filter to them
761 762 763 764 765 766 767 768 769 770 |
# File 'lib/mcollective/rpc/client.rb', line 761 def identity_filter_discovery_optimization if [:filter]["identity"].size > 0 && @discovery_method == "mc" regex_filters = [:filter]["identity"].select{|i| i.match("^\/")}.size if regex_filters == 0 @discovered_agents = [:filter]["identity"].clone @force_direct_request = true if Config.instance.direct_addressing end end end |
#load_aggregate_functions(action, ddl) ⇒ Object
696 697 698 699 700 701 702 703 704 705 |
# File 'lib/mcollective/rpc/client.rb', line 696 def load_aggregate_functions(action, ddl) return nil unless ddl return nil unless ddl.action_interface(action).keys.include?(:aggregate) return Aggregate.new(ddl.action_interface(action)) rescue => e Log.error("Failed to load aggregate functions, calculating summaries disabled: %s: %s (%s)" % [e.backtrace.first, e.to_s, e.class]) return nil end |
#new_request(action, data) ⇒ Object
Creates a suitable request hash for the SimpleRPC agent.
You’d use this if you ever wanted to take care of sending requests on your own - perhaps via Client#sendreq if you didn’t care for responses.
In that case you can just do:
msg = your_rpc.new_request("some_action", :foo => :bar)
filter = your_rpc.filter
your_rpc.client.sendreq(msg, msg[:agent], filter)
This will send a SimpleRPC request to the action some_action with arguments :foo = :bar, it will return immediately and you will have no indication at all if the request was receieved or not
Clearly the use of this technique should be limited and done only if your code requires such a thing
153 154 155 156 157 158 159 160 161 162 |
# File 'lib/mcollective/rpc/client.rb', line 153 def new_request(action, data) callerid = PluginManager["security_plugin"].callerid raise 'callerid received from security plugin is not valid' unless PluginManager["security_plugin"].valid_callerid?(callerid) {:agent => @agent, :action => action, :caller => callerid, :data => data} end |
#options ⇒ Object
Provides a normal options hash like you would get from Optionparser
566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 |
# File 'lib/mcollective/rpc/client.rb', line 566 def {:disctimeout => discovery_timeout, :timeout => @timeout, :verbose => @verbose, :filter => @filter, :collective => @collective, :output_format => @output_format, :ttl => @ttl, :discovery_method => @discovery_method, :discovery_options => @discovery_options, :force_display_mode => @force_display_mode, :config => @config, :publish_timeout => @publish_timeout, :threaded => @threaded} end |
#pick_nodes_from_discovered(count) ⇒ Object
Pick a number of nodes from the discovered nodes
The count should be a string that can be either just a number or a percentage like 10%
It will select nodes from the discovered list based on the rpclimitmethod configuration option which can be either :first or anything else
- :first would be a simple way to do a distance based
selection
- anything else will just pick one at random
- if random chosen, and batch-seed set, then set srand
for the generator, and reset afterwards
655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 |
# File 'lib/mcollective/rpc/client.rb', line 655 def pick_nodes_from_discovered(count) if count =~ /%$/ pct = Integer((discover.size * (count.to_f / 100))) pct == 0 ? count = 1 : count = pct else count = Integer(count) end return discover if discover.size <= count result = [] if @limit_method == :first return discover[0, count] else # we delete from the discovered list because we want # to be sure there is no chance that the same node will # be randomly picked twice. So we have to clone the # discovered list else this method will only ever work # once per discovery cycle and not actually return the # right nodes. haystack = discover.clone if @limit_seed haystack.sort! srand(@limit_seed) end count.times do rnd = rand(haystack.size) result << haystack.delete_at(rnd) end # Reset random number generator to fresh seed # As our seed from options is most likely short srand if @limit_seed end [result].flatten end |
#process_results_with_block(action, resp, block, aggregate) ⇒ Object
process client requests by calling a block on each result in this mode we do not do anything fancy with the result objects and we raise exceptions if there are problems with the data
987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 |
# File 'lib/mcollective/rpc/client.rb', line 987 def process_results_with_block(action, resp, block, aggregate) @stats.node_responded(resp[:senderid]) result = rpc_result_from_reply(@agent, action, resp) aggregate = aggregate_reply(result, aggregate) if aggregate @stats.ok if resp[:body][:statuscode] == 0 @stats.fail if resp[:body][:statuscode] != 0 @stats.time_block_execution :start case block.arity when 1 block.call(resp) when 2 block.call(resp, result) end @stats.time_block_execution :end return aggregate end |
#process_results_without_block(resp, action, aggregate) ⇒ Object
Handles result sets that has no block associated, sets fails and ok in the stats object and return a hash of the response to send to the caller
967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 |
# File 'lib/mcollective/rpc/client.rb', line 967 def process_results_without_block(resp, action, aggregate) @stats.node_responded(resp[:senderid]) result = rpc_result_from_reply(@agent, action, resp) aggregate = aggregate_reply(result, aggregate) if aggregate if resp[:body][:statuscode] == 0 || resp[:body][:statuscode] == 1 @stats.ok if resp[:body][:statuscode] == 0 @stats.fail if resp[:body][:statuscode] == 1 else @stats.fail end [result, aggregate] end |
#reset ⇒ Object
Resets various internal parts of the class, most importantly it clears out the cached discovery
452 453 454 |
# File 'lib/mcollective/rpc/client.rb', line 452 def reset @discovered_agents = nil end |
#reset_filter ⇒ Object
Reet the filter to an empty one
457 458 459 460 |
# File 'lib/mcollective/rpc/client.rb', line 457 def reset_filter @filter = Util.empty_filter agent_filter @agent end |
#rpc_result_from_reply(agent, action, reply) ⇒ Object
717 718 719 720 |
# File 'lib/mcollective/rpc/client.rb', line 717 def rpc_result_from_reply(agent, action, reply) Result.new(agent, action, {:sender => reply[:senderid], :statuscode => reply[:body][:statuscode], :statusmsg => reply[:body][:statusmsg], :data => reply[:body][:data]}) end |
#validate_request(action, args) ⇒ Object
For the provided arguments and action the input arguments get modified by supplying any defaults provided in the DDL for arguments that were not supplied in the request
We then pass the modified arguments to the DDL for validation
169 170 171 172 173 174 |
# File 'lib/mcollective/rpc/client.rb', line 169 def validate_request(action, args) raise "No DDL found for agent %s cannot validate inputs" % @agent unless @ddl @ddl.set_default_input_arguments(action, args) @ddl.validate_rpc_request(action, args) end |