Class: MCollective::RPC::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/mcollective/rpc/client.rb

Overview

The main component of the Simple RPC client system, this wraps around MCollective::Client and just brings in a lot of convention and standard approached.

Constant Summary collapse

@@initial_options =
nil

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(agent, flags = {}) ⇒ Client

Creates a stub for a remote agent, you can pass in an options array in the flags which will then be used else it will just create a default options array with filtering enabled based on the standard command line use.

rpc = RPC::Client.new("rpctest", :configfile => "client.cfg", :options => options)

You typically would not call this directly you’d use MCollective::RPC#rpcclient instead which is a wrapper around this that can be used as a Mixin



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/mcollective/rpc/client.rb', line 20

def initialize(agent, flags = {})
  if flags.include?(:options)
    initial_options = flags[:options]

  elsif @@initial_options
    initial_options = Marshal.load(@@initial_options)

  else
    oparser = MCollective::Optionparser.new({ :verbose => false,
                                              :progress_bar => true,
                                              :mcollective_limit_targets => false,
                                              :batch_size => nil,
                                              :batch_sleep_time => 1 },
                                            "filter")

    initial_options = oparser.parse do |parser, opts|
      if block_given?
        yield(parser, opts)
      end

      Helpers.add_simplerpc_options(parser, opts)
    end

    @@initial_options = Marshal.dump(initial_options)
  end

  @initial_options = initial_options

  @config = initial_options[:config]
  @client = MCollective::Client.new(@initial_options)

  @stats = Stats.new
  @agent = agent
  @timeout = initial_options[:timeout] || 5
  @verbose = initial_options[:verbose]
  @filter = initial_options[:filter] || Util.empty_filter
  @discovered_agents = nil
  @progress = initial_options[:progress_bar]
  @limit_targets = initial_options[:mcollective_limit_targets]
  @limit_method = Config.instance.rpclimitmethod
  @limit_seed = initial_options[:limit_seed] || nil
  @output_format = initial_options[:output_format] || :console
  @force_direct_request = false
  @reply_to = initial_options[:reply_to]
  @discovery_method = initial_options[:discovery_method]
  if !@discovery_method
    @discovery_method = Config.instance.default_discovery_method
    @default_discovery_method = true
  else
    @default_discovery_method = false
  end
  @discovery_options = initial_options[:discovery_options] || []
  @force_display_mode = initial_options[:force_display_mode] || false

  @batch_size = initial_options[:batch_size] || Config.instance.default_batch_size
  @batch_sleep_time = Float(initial_options[:batch_sleep_time] || Config.instance.default_batch_sleep_time)
  @batch_mode = determine_batch_mode(@batch_size)

  agent_filter agent

  @discovery_timeout = @initial_options.fetch(:disctimeout, nil) || Config.instance.discovery_timeout

  @collective = @client.collective
  @ttl = initial_options[:ttl] || Config.instance.ttl
  @publish_timeout = initial_options[:publish_timeout] || Config.instance.publish_timeout
  @threaded = initial_options[:threaded] || Config.instance.threaded

  # if we can find a DDL for the service override
  # the timeout of the client so we always magically
  # wait appropriate amounts of time.
  #
  # We add the discovery timeout to the ddl supplied
  # timeout as the discovery timeout tends to be tuned
  # for local network conditions and fact source speed
  # which would other wise not be accounted for and
  # some results might get missed.
  #
  # We do this only if the timeout is the default 5
  # seconds, so that users cli overrides will still
  # get applied
  #
  # DDLs are required, failure to find a DDL is fatal
  @ddl = DDL.new(agent)
  @stats.ddl = @ddl
  @timeout = @ddl.meta[:timeout] + discovery_timeout if @timeout == 5

  # allows stderr and stdout to be overridden for testing
  # but also for web apps that might not want a bunch of stuff
  # generated to actual file handles
  if initial_options[:stderr]
    @stderr = initial_options[:stderr]
  else
    @stderr = STDERR
    @stderr.sync = true
  end

  if initial_options[:stdout]
    @stdout = initial_options[:stdout]
  else
    @stdout = STDOUT
    @stdout.sync = true
  end

  if initial_options[:stdin]
    @stdin = initial_options[:stdin]
  else
    @stdin = STDIN
  end
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(method_name, *args, &block) ⇒ Object

Magic handler to invoke remote methods

Once the stub is created using the constructor or the RPC#rpcclient helper you can call remote actions easily:

ret = rpc.echo(:msg => "hello world")

This will call the ‘echo’ action of the ‘rpctest’ agent and return the result as an array, the array will be a simplified result set from the usual full MCollective::Client#req with additional error codes and error text:

:sender => "remote.box.com",
:statuscode => 0,
:statusmsg => "OK",
:data => "hello world"

If :statuscode is 0 then everything went find, if it’s 1 then you supplied the correct arguments etc but the request could not be completed, you’ll find a human parsable reason in :statusmsg then.

Codes 2 to 5 maps directly to UnknownRPCAction, MissingRPCData, InvalidRPCData and UnknownRPCError see below for a description of those, in each case :statusmsg would be the reason for failure.

To get access to the full result of the MCollective::Client#req calls you can pass in a block:

rpc.echo(:msg => "hello world") do |resp|
   pp resp
end

In this case resp will the result from MCollective::Client#req. Instead of returning simple text and codes as above you’ll also need to handle the following exceptions:

UnknownRPCAction - There is no matching action on the agent MissingRPCData - You did not supply all the needed parameters for the action InvalidRPCData - The data you did supply did not pass validation UnknownRPCError - Some other error prevented the agent from running

During calls a progress indicator will be shown of how many results we’ve received against how many nodes were discovered, you can disable this by setting progress to false:

rpc.progress = false

This supports a 2nd mode where it will send the SimpleRPC request and never handle the responses. It’s a bit like UDP, it sends the request with the filter attached and you only get back the requestid, you have no indication about results.

You can invoke this using:

puts rpc.echo(:process_results => false)

This will output just the request id.

Batched processing is supported:

printrpc rpc.ping(:batch_size => 5)

This will do everything exactly as normal but communicate to only 5 agents at a time



241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
# File 'lib/mcollective/rpc/client.rb', line 241

def method_missing(method_name, *args, &block)
  # set args to an empty hash if nothings given
  args = args[0]
  args = {} if args.nil?

  action = method_name.to_s

  @stats.reset

  validate_request(action, args)

  # TODO(ploubser): The logic here seems poor. It implies that it is valid to
  # pass arguments where batch_mode is set to false and batch_mode > 0.
  # If this is the case we completely ignore the supplied value of batch_mode
  # and do our own thing.

  # if a global batch size is set just use that else set it
  # in the case that it was passed as an argument
  batch_mode = args.include?(:batch_size) || @batch_mode
  batch_size = args.delete(:batch_size) || @batch_size
  batch_sleep_time = args.delete(:batch_sleep_time) || @batch_sleep_time

  # if we were given a batch_size argument thats 0 and batch_mode was
  # determined to be on via global options etc this will allow a batch_size
  # of 0 to disable or batch_mode for this call only
  batch_mode = determine_batch_mode(batch_size)

  # Handle single target requests by doing discovery and picking
  # a random node.  Then do a custom request specifying a filter
  # that will only match the one node.
  if @limit_targets
    target_nodes = pick_nodes_from_discovered(@limit_targets)
    Log.debug("Picked #{target_nodes.join(',')} as limited target(s)")

    custom_request(action, args, target_nodes, {"identity" => /^(#{target_nodes.join('|')})$/}, &block)
  elsif batch_mode
    call_agent_batched(action, args, options, batch_size, batch_sleep_time, &block)
  else
    call_agent(action, args, options, :auto, &block)
  end
end

Instance Attribute Details

#agentObject (readonly)

Returns the value of attribute agent.



7
8
9
# File 'lib/mcollective/rpc/client.rb', line 7

def agent
  @agent
end

#batch_modeObject (readonly)

Returns the value of attribute batch_mode.



7
8
9
# File 'lib/mcollective/rpc/client.rb', line 7

def batch_mode
  @batch_mode
end

#batch_sizeObject

Returns the value of attribute batch_size.



7
8
9
# File 'lib/mcollective/rpc/client.rb', line 7

def batch_size
  @batch_size
end

#batch_sleep_timeObject

Returns the value of attribute batch_sleep_time.



7
8
9
# File 'lib/mcollective/rpc/client.rb', line 7

def batch_sleep_time
  @batch_sleep_time
end

#clientObject (readonly)

Returns the value of attribute client.



7
8
9
# File 'lib/mcollective/rpc/client.rb', line 7

def client
  @client
end

#configObject

Returns the value of attribute config.



6
7
8
# File 'lib/mcollective/rpc/client.rb', line 6

def config
  @config
end

#ddlObject (readonly)

Returns the value of attribute ddl.



7
8
9
# File 'lib/mcollective/rpc/client.rb', line 7

def ddl
  @ddl
end

#default_discovery_methodObject (readonly)

Returns the value of attribute default_discovery_method.



8
9
10
# File 'lib/mcollective/rpc/client.rb', line 8

def default_discovery_method
  @default_discovery_method
end

#discovery_methodObject

Returns the value of attribute discovery_method.



8
9
10
# File 'lib/mcollective/rpc/client.rb', line 8

def discovery_method
  @discovery_method
end

#discovery_optionsObject

Returns the value of attribute discovery_options.



8
9
10
# File 'lib/mcollective/rpc/client.rb', line 8

def discovery_options
  @discovery_options
end

#filterObject

Returns the value of attribute filter.



6
7
8
# File 'lib/mcollective/rpc/client.rb', line 6

def filter
  @filter
end

#limit_methodObject

Returns the value of attribute limit_method.



7
8
9
# File 'lib/mcollective/rpc/client.rb', line 7

def limit_method
  @limit_method
end

#limit_seedObject (readonly)

Returns the value of attribute limit_seed.



8
9
10
# File 'lib/mcollective/rpc/client.rb', line 8

def limit_seed
  @limit_seed
end

#limit_targetsObject

Returns the value of attribute limit_targets.



7
8
9
# File 'lib/mcollective/rpc/client.rb', line 7

def limit_targets
  @limit_targets
end

#output_formatObject (readonly)

Returns the value of attribute output_format.



7
8
9
# File 'lib/mcollective/rpc/client.rb', line 7

def output_format
  @output_format
end

#progressObject

Returns the value of attribute progress.



6
7
8
# File 'lib/mcollective/rpc/client.rb', line 6

def progress
  @progress
end

#reply_toObject

Returns the value of attribute reply_to.



6
7
8
# File 'lib/mcollective/rpc/client.rb', line 6

def reply_to
  @reply_to
end

#statsObject (readonly)

Returns the value of attribute stats.



7
8
9
# File 'lib/mcollective/rpc/client.rb', line 7

def stats
  @stats
end

#timeoutObject

Returns the value of attribute timeout.



6
7
8
# File 'lib/mcollective/rpc/client.rb', line 6

def timeout
  @timeout
end

#ttlObject

Returns the value of attribute ttl.



6
7
8
# File 'lib/mcollective/rpc/client.rb', line 6

def ttl
  @ttl
end

#verboseObject

Returns the value of attribute verbose.



6
7
8
# File 'lib/mcollective/rpc/client.rb', line 6

def verbose
  @verbose
end

Instance Method Details

#agent_filter(agent) ⇒ Object

Sets the agent filter



437
438
439
440
441
# File 'lib/mcollective/rpc/client.rb', line 437

def agent_filter(agent)
  @filter["agent"] = @filter["agent"] | [agent]
  @filter["agent"].compact!
  reset
end

#aggregate_reply(reply, aggregate) ⇒ Object



731
732
733
734
735
736
737
738
739
# File 'lib/mcollective/rpc/client.rb', line 731

def aggregate_reply(reply, aggregate)
  return nil unless aggregate

  aggregate.call_functions(reply)
  return aggregate
rescue Exception => e
  Log.error("Failed to calculate aggregate summaries for reply from %s, calculating summaries disabled: %s: %s (%s)" % [reply[:senderid], e.backtrace.first, e.to_s, e.class])
  return nil
end

#call_agent(action, args, opts, disc = :auto, &block) ⇒ Object

Handles traditional calls to the remote agents with full stats blocks, non blocks and everything else supported.

Other methods of calling the nodes can reuse this code by for example specifying custom options and discovery data



913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
# File 'lib/mcollective/rpc/client.rb', line 913

def call_agent(action, args, opts, disc=:auto, &block)
  # Handle fire and forget requests and make sure
  # the :process_results value is set appropriately
  #
  # specific reply-to requests should be treated like
  # fire and forget since the client will never get
  # the responses
  if args[:process_results] == false || @reply_to
    return fire_and_forget_request(action, args)
  else
    args[:process_results] = true
  end

  # Do discovery when no specific discovery array is given
  #
  # If an array is given set the force_direct_request hint that
  # will tell the message object to be a direct request one
  if disc == :auto
    discovered = discover
  else
    @force_direct_request = true if Config.instance.direct_addressing
    discovered = disc
  end

  req = new_request(action.to_s, args)

  message = Message.new(req, nil, {:agent => @agent, :type => :request, :collective => @collective, :filter => opts[:filter], :options => opts})
  message.discovered_hosts = discovered.clone

  results = []
  respcount = 0

  if discovered.size > 0
    message.type = :direct_request if @force_direct_request

    if @progress && !block_given?
      twirl = Progress.new
      @stdout.puts
      @stdout.print twirl.twirl(respcount, discovered.size)
    end

    aggregate = load_aggregate_functions(action, @ddl)

    @client.req(message) do |resp|
      respcount += 1

      if block_given?
        aggregate = process_results_with_block(action, resp, block, aggregate)
      else
        @stdout.print twirl.twirl(respcount, discovered.size) if @progress

        result, aggregate = process_results_without_block(resp, action, aggregate)

        results << result
      end
    end

    if @initial_options[:sort]
      results.sort!
    end

    @stats.aggregate_summary = aggregate.summarize if aggregate
    @stats.aggregate_failures = aggregate.failed if aggregate
    @stats.client_stats = @client.stats
  else
    @stderr.print("\nNo request sent, we did not discover any nodes.")
  end

  @stats.finish_request

  RPC.stats(@stats)

  @stdout.print("\n\n") if @progress

  if block_given?
    return stats
  else
    return [results].flatten
  end
end

#call_agent_batched(action, args, opts, batch_size, sleep_time, &block) ⇒ Object

Calls an agent in a way very similar to call_agent but it supports batching the queries to the network.

The result sets, stats, block handling etc is all exactly like you would expect from normal call_agent.

This is used by method_missing and works only with direct addressing mode



808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
# File 'lib/mcollective/rpc/client.rb', line 808

def call_agent_batched(action, args, opts, batch_size, sleep_time, &block)
  raise "Batched requests requires direct addressing" unless Config.instance.direct_addressing
  raise "Cannot bypass result processing for batched requests" if args[:process_results] == false
  validate_batch_size(batch_size)

  sleep_time = Float(sleep_time)

  Log.debug("Calling #{agent}##{action} in batches of #{batch_size} with sleep time of #{sleep_time}")

  @force_direct_request = true

  discovered = discover
  results = []
  respcount = 0

  if discovered.size > 0
    req = new_request(action.to_s, args)

    aggregate = load_aggregate_functions(action, @ddl)

    if @progress && !block_given?
      twirl = Progress.new
      @stdout.puts
      @stdout.print twirl.twirl(respcount, discovered.size)
    end

    if (batch_size =~ /^(\d+)%$/)
      # determine batch_size as a percentage of the discovered array's size
      batch_size = (discovered.size / 100.0 * Integer($1)).ceil
    else
      batch_size = Integer(batch_size)
    end

    @stats.requestid = nil
    processed_nodes = 0

    discovered.in_groups_of(batch_size) do |hosts|
      message = Message.new(req, nil, {:agent => @agent,
                                       :type => :direct_request,
                                       :collective => @collective,
                                       :filter => opts[:filter],
                                       :options => opts})

      # first time round we let the Message object create a request id
      # we then re-use it for future requests to keep auditing sane etc
      @stats.requestid = message.create_reqid unless @stats.requestid
      message.requestid = @stats.requestid

      message.discovered_hosts = hosts.clone.compact

      @client.req(message) do |resp|
        respcount += 1

        if block_given?
          aggregate = process_results_with_block(action, resp, block, aggregate)
        else
          @stdout.print twirl.twirl(respcount, discovered.size) if @progress

          result, aggregate = process_results_without_block(resp, action, aggregate)

          results << result
        end
      end

      if @initial_options[:sort]
        results.sort!
      end

      @stats.noresponsefrom.concat @client.stats[:noresponsefrom]
      @stats.unexpectedresponsefrom.concat @client.stats[:unexpectedresponsefrom]
      @stats.responses += @client.stats[:responses]
      @stats.blocktime += @client.stats[:blocktime] + sleep_time
      @stats.totaltime += @client.stats[:totaltime]
      @stats.discoverytime += @client.stats[:discoverytime]

      processed_nodes += hosts.length
      if (discovered.length > processed_nodes)
        sleep sleep_time
      end
    end

    @stats.aggregate_summary = aggregate.summarize if aggregate
    @stats.aggregate_failures = aggregate.failed if aggregate
  else
    @stderr.print("\nNo request sent, we did not discover any nodes.")
  end

  @stats.finish_request

  RPC.stats(@stats)

  @stdout.print("\n") if @progress

  if block_given?
    return stats
  else
    return [results].flatten
  end
end

#class_filter(klass) ⇒ Object

Sets the class filter



413
414
415
416
417
# File 'lib/mcollective/rpc/client.rb', line 413

def class_filter(klass)
  @filter["cf_class"] = @filter["cf_class"] | [klass]
  @filter["cf_class"].compact!
  reset
end

#collective=(c) ⇒ Object

Sets the collective we are communicating with



607
608
609
610
611
612
613
# File 'lib/mcollective/rpc/client.rb', line 607

def collective=(c)
  raise "Unknown collective #{c}" unless Config.instance.collectives.include?(c)

  @collective = c
  @client.options = options
  reset
end

#compound_filter(filter) ⇒ Object

Set a compound filter



451
452
453
454
# File 'lib/mcollective/rpc/client.rb', line 451

def compound_filter(filter)
  @filter["compound"] = @filter["compound"] |  [Matcher.create_compound_callstack(filter)]
  reset
end

#custom_request(action, args, expected_agents, filter = {}, &block) ⇒ Object

Constructs custom requests with custom filters and discovery data the idea is that this would be used in web applications where you might be using a cached copy of data provided by a registration agent to figure out on your own what nodes will be responding and what your filter would be.

This will help you essentially short circuit the traditional cycle of:

mc discover / call / wait for discovered nodes

by doing discovery however you like, contructing a filter and a list of nodes you expect responses from.

Other than that it will work exactly like a normal call, blocks will behave the same way, stats will be handled the same way etcetc

If you just wanted to contact one machine for example with a client that already has other filter options setup you can do:

puppet.custom_request(“runonce”, {}, [“your.box.com”], => “your.box.com”)

This will do runonce action on just ‘your.box.com’, no discovery will be done and after receiving just one response it will stop waiting for responses

If direct_addressing is enabled in the config file you can provide an empty hash as a filter, this will force that request to be a directly addressed request which technically does not need filters. If you try to use this mode with direct addressing disabled an exception will be raise



311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
# File 'lib/mcollective/rpc/client.rb', line 311

def custom_request(action, args, expected_agents, filter = {}, &block)
  validate_request(action, args)

  if filter == {} && !Config.instance.direct_addressing
    raise "Attempted to do a filterless custom_request without direct_addressing enabled, preventing unexpected call to all nodes"
  end

  @stats.reset

  custom_filter = Util.empty_filter
  custom_options = options.clone

  # merge the supplied filter with the standard empty one
  # we could just use the merge method but I want to be sure
  # we dont merge in stuff that isnt actually valid
  ["identity", "fact", "agent", "cf_class", "compound"].each do |ftype|
    if filter.include?(ftype)
      custom_filter[ftype] = [filter[ftype], custom_filter[ftype]].flatten
    end
  end

  # ensure that all filters at least restrict the call to the agent we're a proxy for
  custom_filter["agent"] << @agent unless custom_filter["agent"].include?(@agent)
  custom_options[:filter] = custom_filter

  # Fake out the stats discovery would have put there
  @stats.discovered_agents([expected_agents].flatten)

  # Handle fire and forget requests
  #
  # If a specific reply-to was set then from the client perspective this should
  # be a fire and forget request too since no response will ever reach us - it
  # will go to the reply-to destination
  if args[:process_results] == false || @reply_to
    return fire_and_forget_request(action, args, custom_filter)
  end

  # Now do a call pretty much exactly like in method_missing except with our own
  # options and discovery magic
  if block_given?
    call_agent(action, args, custom_options, [expected_agents].flatten) do |r|
      block.call(r)
    end
  else
    call_agent(action, args, custom_options, [expected_agents].flatten)
  end
end

#detect_and_set_stdin_discoveryObject

Detects data on STDIN and sets the STDIN discovery method

IF the discovery method hasn’t been explicitly overridden

and we're not being run interactively,
and someone has piped us some data

Then we assume it’s a discovery list - this can be either:

- list of hosts in plaintext
- JSON that came from another rpc or printrpc

Then we override discovery to try to grok the data on STDIN



479
480
481
482
483
484
# File 'lib/mcollective/rpc/client.rb', line 479

def detect_and_set_stdin_discovery
  if self.default_discovery_method && !@stdin.tty? && !@stdin.eof?
    self.discovery_method = 'stdin'
    self.discovery_options = 'auto'
  end
end

#disconnectObject

Disconnects cleanly from the middleware



131
132
133
# File 'lib/mcollective/rpc/client.rb', line 131

def disconnect
  @client.disconnect
end

#discover(flags = {}) ⇒ Object

Does discovery based on the filters set, if a discovery was previously done return that else do a new discovery.

Alternatively if identity filters are given and none of them are regular expressions then just use the provided data as discovered data, avoiding discovery

Discovery can be forced if direct_addressing is enabled by passing in an array of nodes with :nodes or JSON data like those produced by mcollective RPC JSON output using :json

Will show a message indicating its doing discovery if running verbose or if the :verbose flag is passed in.

Use reset to force a new discovery



501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
# File 'lib/mcollective/rpc/client.rb', line 501

def discover(flags={})
  flags.keys.each do |key|
    raise "Unknown option #{key} passed to discover" unless [:verbose, :hosts, :nodes, :json].include?(key)
  end

  flags.include?(:verbose) ? verbose = flags[:verbose] : verbose = @verbose

  verbose = false unless @output_format == :console

  # flags[:nodes] and flags[:hosts] are the same thing, we should never have
  # allowed :hosts as that was inconsistent with the established terminology
  flags[:nodes] = flags.delete(:hosts) if flags.include?(:hosts)

  reset if flags[:nodes] || flags[:json]

  unless @discovered_agents
    # if either hosts or JSON is supplied try to figure out discovery data from there
    # if direct_addressing is not enabled this is a critical error as the user might
    # not have supplied filters so raise an exception
    if flags[:nodes] || flags[:json]
      raise "Can only supply discovery data if direct_addressing is enabled" unless Config.instance.direct_addressing

      hosts = []

      if flags[:nodes]
        hosts = Helpers.extract_hosts_from_array(flags[:nodes])
      elsif flags[:json]
        hosts = Helpers.extract_hosts_from_json(flags[:json])
      end

      raise "Could not find any hosts in discovery data provided" if hosts.empty?

      @discovered_agents = hosts
      @force_direct_request = true

    else
      identity_filter_discovery_optimization
    end
  end

  # All else fails we do it the hard way using a traditional broadcast
  unless @discovered_agents
    @stats.time_discovery :start

    @client.options = options

    # if compound filters are used the only real option is to use the mc
    # discovery plugin since its the only capable of using data queries etc
    # and we do not want to degrade that experience just to allow compounds
    # on other discovery plugins the UX would be too bad raising complex sets
    # of errors etc.
    @client.discoverer.force_discovery_method_by_filter(options[:filter])

    if verbose
      actual_timeout = @client.discoverer.discovery_timeout(discovery_timeout, options[:filter])

      if actual_timeout > 0
        @stderr.print("Discovering hosts using the %s method for %d second(s) .... " % [@client.discoverer.discovery_method, actual_timeout])
      else
        @stderr.print("Discovering hosts using the %s method .... " % [@client.discoverer.discovery_method])
      end
    end

    # if the requested limit is a pure number and not a percent
    # and if we're configured to use the first found hosts as the
    # limit method then pass in the limit thus minimizing the amount
    # of work we do in the discover phase and speeding it up significantly
    filter = @filter.merge({'collective' => @collective})
    if @limit_method == :first and @limit_targets.is_a?(Integer)
      @discovered_agents = @client.discover(filter, discovery_timeout, @limit_targets)
    else
      @discovered_agents = @client.discover(filter, discovery_timeout)
    end

    @stderr.puts(@discovered_agents.size) if verbose

    @force_direct_request = @client.discoverer.force_direct_mode?

    @stats.time_discovery :end
  end

  @stats.discovered_agents(@discovered_agents)
  RPC.discovered(@discovered_agents)

  @discovered_agents
end

#discovery_timeoutObject



359
360
361
362
# File 'lib/mcollective/rpc/client.rb', line 359

def discovery_timeout
  return @discovery_timeout if @discovery_timeout
  return @client.discoverer.ddl.meta[:timeout]
end

#discovery_timeout=(timeout) ⇒ Object



364
365
366
367
368
369
370
371
372
373
374
375
376
# File 'lib/mcollective/rpc/client.rb', line 364

def discovery_timeout=(timeout)
  @discovery_timeout = Float(timeout)

  # we calculate the overall timeout from the DDL of the agent and
  # the supplied discovery timeout unless someone specifically
  # specifies a timeout to the constructor
  #
  # But if we also then specifically set a discovery_timeout on the
  # agent that has to override the supplied timeout so we then
  # calculate a correct timeout based on DDL timeout and the
  # supplied discovery timeout
  @timeout = @ddl.meta[:timeout] + discovery_timeout
end

#fact_filter(fact, value = nil, operator = "=") ⇒ Object

Sets the fact filter



420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
# File 'lib/mcollective/rpc/client.rb', line 420

def fact_filter(fact, value=nil, operator="=")
  return if fact.nil?
  return if fact == false

  if value.nil?
    parsed = Util.parse_fact_string(fact)
    @filter["fact"] = @filter["fact"] | [parsed] unless parsed == false
  else
    parsed = Util.parse_fact_string("#{fact}#{operator}#{value}")
    @filter["fact"] = @filter["fact"] | [parsed] unless parsed == false
  end

  @filter["fact"].compact!
  reset
end

#fire_and_forget_request(action, args, filter = nil) ⇒ Object

for requests that do not care for results just return the request id and don’t do any of the response processing.

We send the :process_results flag with to the nodes so they can make decisions based on that.

Should only be called via method_missing



759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
# File 'lib/mcollective/rpc/client.rb', line 759

def fire_and_forget_request(action, args, filter=nil)
  validate_request(action, args)

  identity_filter_discovery_optimization

  req = new_request(action.to_s, args)

  filter = options[:filter] unless filter

  message = Message.new(req, nil, {:agent => @agent, :type => :request, :collective => @collective, :filter => filter, :options => options})
  message.reply_to = @reply_to if @reply_to

  if @force_direct_request || @client.discoverer.force_direct_mode?
    message.discovered_hosts = discover.clone
    message.type = :direct_request
  end

  client.sendreq(message, nil)
end

#help(template) ⇒ Object

Returns help for an agent if a DDL was found



136
137
138
# File 'lib/mcollective/rpc/client.rb', line 136

def help(template)
  @ddl.help(template)
end

#identity_filter(identity) ⇒ Object

Sets the identity filter



444
445
446
447
448
# File 'lib/mcollective/rpc/client.rb', line 444

def identity_filter(identity)
  @filter["identity"] = @filter["identity"] | [identity]
  @filter["identity"].compact!
  reset
end

#identity_filter_discovery_optimizationObject

if an identity filter is supplied and it is all strings no regex we can use that as discovery data, technically the identity filter is then redundant if we are in direct addressing mode and we could empty it out but this use case should only really be for a few -I’s on the CLI

For safety we leave the filter in place for now, that way we can support this enhancement also in broadcast mode.

This is only needed for the ‘mc’ discovery method, other methods might change the concept of identity to mean something else so we should pass the full identity filter to them



790
791
792
793
794
795
796
797
798
799
# File 'lib/mcollective/rpc/client.rb', line 790

def identity_filter_discovery_optimization
  if options[:filter]["identity"].size > 0 && @discovery_method == "mc"
    regex_filters = options[:filter]["identity"].select{|i| i.match("^\/")}.size

    if regex_filters == 0
      @discovered_agents = options[:filter]["identity"].clone
      @force_direct_request = true if Config.instance.direct_addressing
    end
  end
end

#load_aggregate_functions(action, ddl) ⇒ Object



720
721
722
723
724
725
726
727
728
729
# File 'lib/mcollective/rpc/client.rb', line 720

def load_aggregate_functions(action, ddl)
  return nil unless ddl
  return nil unless ddl.action_interface(action).keys.include?(:aggregate)

  return Aggregate.new(ddl.action_interface(action))

rescue => e
  Log.error("Failed to load aggregate functions, calculating summaries disabled: %s: %s (%s)" % [e.backtrace.first, e.to_s, e.class])
  return nil
end

#new_request(action, data) ⇒ Object

Creates a suitable request hash for the SimpleRPC agent.

You’d use this if you ever wanted to take care of sending requests on your own - perhaps via Client#sendreq if you didn’t care for responses.

In that case you can just do:

msg = your_rpc.new_request("some_action", :foo => :bar)
filter = your_rpc.filter

your_rpc.client.sendreq(msg, msg[:agent], filter)

This will send a SimpleRPC request to the action some_action with arguments :foo = :bar, it will return immediately and you will have no indication at all if the request was receieved or not

Clearly the use of this technique should be limited and done only if your code requires such a thing



159
160
161
162
163
164
165
166
167
168
# File 'lib/mcollective/rpc/client.rb', line 159

def new_request(action, data)
  callerid = PluginManager["security_plugin"].callerid

  raise 'callerid received from security plugin is not valid' unless PluginManager["security_plugin"].valid_callerid?(callerid)

  {:agent  => @agent,
   :action => action,
   :caller => callerid,
   :data   => data}
end

#optionsObject

Provides a normal options hash like you would get from Optionparser



590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
# File 'lib/mcollective/rpc/client.rb', line 590

def options
  {:disctimeout => discovery_timeout,
   :timeout => @timeout,
   :verbose => @verbose,
   :filter => @filter,
   :collective => @collective,
   :output_format => @output_format,
   :ttl => @ttl,
   :discovery_method => @discovery_method,
   :discovery_options => @discovery_options,
   :force_display_mode => @force_display_mode,
   :config => @config,
   :publish_timeout => @publish_timeout,
   :threaded => @threaded}
end

#pick_nodes_from_discovered(count) ⇒ Object

Pick a number of nodes from the discovered nodes

The count should be a string that can be either just a number or a percentage like 10%

It will select nodes from the discovered list based on the rpclimitmethod configuration option which can be either :first or anything else

- :first would be a simple way to do a distance based
  selection
- anything else will just pick one at random
- if random chosen, and batch-seed set, then set srand
  for the generator, and reset afterwards


679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
# File 'lib/mcollective/rpc/client.rb', line 679

def pick_nodes_from_discovered(count)
  if count =~ /%$/
    pct = Integer((discover.size * (count.to_f / 100)))
    pct == 0 ? count = 1 : count = pct
  else
    count = Integer(count)
  end

  return discover if discover.size <= count

  result = []

  if @limit_method == :first
    return discover[0, count]
  else
    # we delete from the discovered list because we want
    # to be sure there is no chance that the same node will
    # be randomly picked twice.  So we have to clone the
    # discovered list else this method will only ever work
    # once per discovery cycle and not actually return the
    # right nodes.
    haystack = discover.clone

    if @limit_seed
      haystack.sort!
      srand(@limit_seed)
    end

    count.times do
      rnd = rand(haystack.size)
      result << haystack.delete_at(rnd)
    end

    # Reset random number generator to fresh seed
    # As our seed from options is most likely short
    srand if @limit_seed
  end

  [result].flatten
end

#process_results_with_block(action, resp, block, aggregate) ⇒ Object

process client requests by calling a block on each result in this mode we do not do anything fancy with the result objects and we raise exceptions if there are problems with the data



1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
# File 'lib/mcollective/rpc/client.rb', line 1017

def process_results_with_block(action, resp, block, aggregate)
  @stats.node_responded(resp[:senderid])

  result = rpc_result_from_reply(@agent, action, resp)
  aggregate = aggregate_reply(result, aggregate) if aggregate

  @stats.ok if result[:statuscode] == 0
  @stats.fail if result[:statuscode] != 0
  @stats.time_block_execution :start

  case block.arity
    when 1
      block.call(resp)
    when 2
      block.call(resp, result)
  end

  @stats.time_block_execution :end

  return aggregate
end

#process_results_without_block(resp, action, aggregate) ⇒ Object

Handles result sets that has no block associated, sets fails and ok in the stats object and return a hash of the response to send to the caller



997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
# File 'lib/mcollective/rpc/client.rb', line 997

def process_results_without_block(resp, action, aggregate)
  @stats.node_responded(resp[:senderid])

  result = rpc_result_from_reply(@agent, action, resp)
  aggregate = aggregate_reply(result, aggregate) if aggregate

  if result[:statuscode] == 0 || result[:statuscode] == 1
    @stats.ok if result[:statuscode] == 0
    @stats.fail if result[:statuscode] == 1
  else
    @stats.fail
  end

  [result, aggregate]
end

#resetObject

Resets various internal parts of the class, most importantly it clears out the cached discovery



458
459
460
# File 'lib/mcollective/rpc/client.rb', line 458

def reset
  @discovered_agents = nil
end

#reset_filterObject

Reet the filter to an empty one



463
464
465
466
# File 'lib/mcollective/rpc/client.rb', line 463

def reset_filter
  @filter = Util.empty_filter
  agent_filter @agent
end

#rpc_result_from_reply(agent, action, reply) ⇒ Object



741
742
743
744
745
746
747
748
749
# File 'lib/mcollective/rpc/client.rb', line 741

def rpc_result_from_reply(agent, action, reply)
  senderid = reply.include?("senderid") ? reply["senderid"] : reply[:senderid]
  body = reply.include?("body") ? reply["body"] : reply[:body]
  s_code = body.include?("statuscode") ? body["statuscode"] : body[:statuscode]
  s_msg = body.include?("statusmsg") ? body["statusmsg"] : body[:statusmsg]
  data = body.include?("data") ? body["data"] : body[:data]

  Result.new(agent, action, {:sender => senderid, :statuscode => s_code, :statusmsg => s_msg, :data => data})
end

#validate_request(action, args) ⇒ Object

For the provided arguments and action the input arguments get modified by supplying any defaults provided in the DDL for arguments that were not supplied in the request

We then pass the modified arguments to the DDL for validation



175
176
177
178
179
180
# File 'lib/mcollective/rpc/client.rb', line 175

def validate_request(action, args)
  raise "No DDL found for agent %s cannot validate inputs" % @agent unless @ddl

  @ddl.set_default_input_arguments(action, args)
  @ddl.validate_rpc_request(action, args)
end