Class: MCollective::RPC::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/mcollective/rpc/client.rb

Overview

The main component of the Simple RPC client system, this wraps around MCollective::Client and just brings in a lot of convention and standard approached.

Constant Summary collapse

@@initial_options =
nil

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(agent, flags = {}) ⇒ Client

Creates a stub for a remote agent, you can pass in an options array in the flags which will then be used else it will just create a default options array with filtering enabled based on the standard command line use.

rpc = RPC::Client.new("rpctest", :configfile => "client.cfg", :options => options)

You typically would not call this directly you’d use MCollective::RPC#rpcclient instead which is a wrapper around this that can be used as a Mixin



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/mcollective/rpc/client.rb', line 19

def initialize(agent, flags = {})
  if flags.include?(:options)
    initial_options = flags[:options]

  elsif @@initial_options
    initial_options = Marshal.load(@@initial_options)

  else
    oparser = MCollective::Optionparser.new({:verbose => false, :progress_bar => true, :mcollective_limit_targets => false, :batch_size => nil, :batch_sleep_time => 1}, "filter")

    initial_options = oparser.parse do |parser, opts|
      if block_given?
        yield(parser, opts)
      end

      Helpers.add_simplerpc_options(parser, opts)
    end

    @@initial_options = Marshal.dump(initial_options)
  end

  @stats = Stats.new
  @agent = agent
  @discovery_timeout = initial_options[:disctimeout]
  @timeout = initial_options[:timeout]
  @verbose = initial_options[:verbose]
  @filter = initial_options[:filter]
  @config = initial_options[:config]
  @discovered_agents = nil
  @progress = initial_options[:progress_bar]
  @limit_targets = initial_options[:mcollective_limit_targets]
  @limit_method = Config.instance.rpclimitmethod
  @output_format = initial_options[:output_format] || :console
  @force_direct_request = false
  @reply_to = initial_options[:reply_to]

  @batch_size = Integer(initial_options[:batch_size] || 0)
  @batch_sleep_time = Float(initial_options[:batch_sleep_time] || 1)
  @batch_mode = @batch_size > 0

  agent_filter agent

  @client = MCollective::Client.new(@config)
  @client.options = initial_options

  @collective = @client.collective
  @ttl = initial_options[:ttl] || Config.instance.ttl

  # if we can find a DDL for the service override
  # the timeout of the client so we always magically
  # wait appropriate amounts of time.
  #
  # We add the discovery timeout to the ddl supplied
  # timeout as the discovery timeout tends to be tuned
  # for local network conditions and fact source speed
  # which would other wise not be accounted for and
  # some results might get missed.
  #
  # We do this only if the timeout is the default 5
  # seconds, so that users cli overrides will still
  # get applied
  begin
    @ddl = DDL.new(agent)
    @timeout = @ddl.meta[:timeout] + @discovery_timeout if @timeout == 5
  rescue Exception => e
    Log.debug("Could not find DDL: #{e}")
    @ddl = nil
  end

  # allows stderr and stdout to be overridden for testing
  # but also for web apps that might not want a bunch of stuff
  # generated to actual file handles
  if initial_options[:stderr]
    @stderr = initial_options[:stderr]
  else
    @stderr = STDERR
    @stderr.sync = true
  end

  if initial_options[:stdout]
    @stdout = initial_options[:stdout]
  else
    @stdout = STDOUT
    @stdout.sync = true
  end
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(method_name, *args, &block) ⇒ Object

Magic handler to invoke remote methods

Once the stub is created using the constructor or the RPC#rpcclient helper you can call remote actions easily:

ret = rpc.echo(:msg => "hello world")

This will call the ‘echo’ action of the ‘rpctest’ agent and return the result as an array, the array will be a simplified result set from the usual full MCollective::Client#req with additional error codes and error text:

:sender => "remote.box.com",
:statuscode => 0,
:statusmsg => "OK",
:data => "hello world"

If :statuscode is 0 then everything went find, if it’s 1 then you supplied the correct arguments etc but the request could not be completed, you’ll find a human parsable reason in :statusmsg then.

Codes 2 to 5 maps directly to UnknownRPCAction, MissingRPCData, InvalidRPCData and UnknownRPCError see below for a description of those, in each case :statusmsg would be the reason for failure.

To get access to the full result of the MCollective::Client#req calls you can pass in a block:

rpc.echo(:msg => "hello world") do |resp|
   pp resp
end

In this case resp will the result from MCollective::Client#req. Instead of returning simple text and codes as above you’ll also need to handle the following exceptions:

UnknownRPCAction - There is no matching action on the agent MissingRPCData - You did not supply all the needed parameters for the action InvalidRPCData - The data you did supply did not pass validation UnknownRPCError - Some other error prevented the agent from running

During calls a progress indicator will be shown of how many results we’ve received against how many nodes were discovered, you can disable this by setting progress to false:

rpc.progress = false

This supports a 2nd mode where it will send the SimpleRPC request and never handle the responses. It’s a bit like UDP, it sends the request with the filter attached and you only get back the requestid, you have no indication about results.

You can invoke this using:

puts rpc.echo(:process_results => false)

This will output just the request id.

Batched processing is supported:

printrpc rpc.ping(:batch_size => 5)

This will do everything exactly as normal but communicate to only 5 agents at a time



209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
# File 'lib/mcollective/rpc/client.rb', line 209

def method_missing(method_name, *args, &block)
  # set args to an empty hash if nothings given
  args = args[0]
  args = {} if args.nil?

  action = method_name.to_s

  @stats.reset

  @ddl.validate_request(action, args) if @ddl

  # if a global batch size is set just use that else set it
  # in the case that it was passed as an argument
  batch_mode = args.include?(:batch_size) || @batch_mode
  batch_size = args.delete(:batch_size) || @batch_size
  batch_sleep_time = args.delete(:batch_sleep_time) || @batch_sleep_time

  # if we were given a batch_size argument thats 0 and batch_mode was
  # determined to be on via global options etc this will allow a batch_size
  # of 0 to disable or batch_mode for this call only
  batch_mode = (batch_mode && Integer(batch_size) > 0)

  # Handle single target requests by doing discovery and picking
  # a random node.  Then do a custom request specifying a filter
  # that will only match the one node.
  if @limit_targets
    target_nodes = pick_nodes_from_discovered(@limit_targets)
    Log.debug("Picked #{target_nodes.join(',')} as limited target(s)")

    custom_request(action, args, target_nodes, {"identity" => /^(#{target_nodes.join('|')})$/}, &block)
  elsif batch_mode
    call_agent_batched(action, args, options, batch_size, batch_sleep_time, &block)
  else
    call_agent(action, args, options, :auto, &block)
  end
end

Instance Attribute Details

#agentObject (readonly)

Returns the value of attribute agent.



7
8
9
# File 'lib/mcollective/rpc/client.rb', line 7

def agent
  @agent
end

#batch_modeObject (readonly)

Returns the value of attribute batch_mode.



7
8
9
# File 'lib/mcollective/rpc/client.rb', line 7

def batch_mode
  @batch_mode
end

#batch_sizeObject

Returns the value of attribute batch_size.



7
8
9
# File 'lib/mcollective/rpc/client.rb', line 7

def batch_size
  @batch_size
end

#batch_sleep_timeObject

Returns the value of attribute batch_sleep_time.



7
8
9
# File 'lib/mcollective/rpc/client.rb', line 7

def batch_sleep_time
  @batch_sleep_time
end

#clientObject (readonly)

Returns the value of attribute client.



7
8
9
# File 'lib/mcollective/rpc/client.rb', line 7

def client
  @client
end

#configObject

Returns the value of attribute config.



6
7
8
# File 'lib/mcollective/rpc/client.rb', line 6

def config
  @config
end

#ddlObject (readonly)

Returns the value of attribute ddl.



7
8
9
# File 'lib/mcollective/rpc/client.rb', line 7

def ddl
  @ddl
end

#discovery_timeoutObject

Returns the value of attribute discovery_timeout.



6
7
8
# File 'lib/mcollective/rpc/client.rb', line 6

def discovery_timeout
  @discovery_timeout
end

#filterObject

Returns the value of attribute filter.



6
7
8
# File 'lib/mcollective/rpc/client.rb', line 6

def filter
  @filter
end

#limit_methodObject

Returns the value of attribute limit_method.



7
8
9
# File 'lib/mcollective/rpc/client.rb', line 7

def limit_method
  @limit_method
end

#limit_targetsObject

Returns the value of attribute limit_targets.



7
8
9
# File 'lib/mcollective/rpc/client.rb', line 7

def limit_targets
  @limit_targets
end

#output_formatObject (readonly)

Returns the value of attribute output_format.



7
8
9
# File 'lib/mcollective/rpc/client.rb', line 7

def output_format
  @output_format
end

#progressObject

Returns the value of attribute progress.



6
7
8
# File 'lib/mcollective/rpc/client.rb', line 6

def progress
  @progress
end

#reply_toObject

Returns the value of attribute reply_to.



6
7
8
# File 'lib/mcollective/rpc/client.rb', line 6

def reply_to
  @reply_to
end

#statsObject (readonly)

Returns the value of attribute stats.



7
8
9
# File 'lib/mcollective/rpc/client.rb', line 7

def stats
  @stats
end

#timeoutObject

Returns the value of attribute timeout.



6
7
8
# File 'lib/mcollective/rpc/client.rb', line 6

def timeout
  @timeout
end

#ttlObject

Returns the value of attribute ttl.



6
7
8
# File 'lib/mcollective/rpc/client.rb', line 6

def ttl
  @ttl
end

#verboseObject

Returns the value of attribute verbose.



6
7
8
# File 'lib/mcollective/rpc/client.rb', line 6

def verbose
  @verbose
end

Instance Method Details

#agent_filter(agent) ⇒ Object

Sets the agent filter



347
348
349
350
351
# File 'lib/mcollective/rpc/client.rb', line 347

def agent_filter(agent)
  @filter["agent"] << agent
  @filter["agent"].compact!
  reset
end

#class_filter(klass) ⇒ Object

Sets the class filter



323
324
325
326
327
# File 'lib/mcollective/rpc/client.rb', line 323

def class_filter(klass)
  @filter["cf_class"] << klass
  @filter["cf_class"].compact!
  reset
end

#collective=(c) ⇒ Object

Sets the collective we are communicating with



487
488
489
490
# File 'lib/mcollective/rpc/client.rb', line 487

def collective=(c)
  @collective = c
  @client.options[:collective] = c
end

#compound_filter(filter) ⇒ Object

Set a compound filter



361
362
363
364
# File 'lib/mcollective/rpc/client.rb', line 361

def compound_filter(filter)
  @filter["compound"] << Matcher::Parser.new(filter).execution_stack
  reset
end

#custom_request(action, args, expected_agents, filter = {}, &block) ⇒ Object

Constructs custom requests with custom filters and discovery data the idea is that this would be used in web applications where you might be using a cached copy of data provided by a registration agent to figure out on your own what nodes will be responding and what your filter would be.

This will help you essentially short circuit the traditional cycle of:

mc discover / call / wait for discovered nodes

by doing discovery however you like, contructing a filter and a list of nodes you expect responses from.

Other than that it will work exactly like a normal call, blocks will behave the same way, stats will be handled the same way etcetc

If you just wanted to contact one machine for example with a client that already has other filter options setup you can do:

puppet.custom_request(“runonce”, {}, [“your.box.com”], => “your.box.com”)

This will do runonce action on just ‘your.box.com’, no discovery will be done and after receiving just one response it will stop waiting for responses

If direct_addressing is enabled in the config file you can provide an empty hash as a filter, this will force that request to be a directly addressed request which technically does not need filters. If you try to use this mode with direct addressing disabled an exception will be raise



274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
# File 'lib/mcollective/rpc/client.rb', line 274

def custom_request(action, args, expected_agents, filter = {}, &block)
  @ddl.validate_request(action, args) if @ddl

  if filter == {} && !Config.instance.direct_addressing
    raise "Attempted to do a filterless custom_request without direct_addressing enabled, preventing unexpected call to all nodes"
  end

  @stats.reset

  custom_filter = Util.empty_filter
  custom_options = options.clone

  # merge the supplied filter with the standard empty one
  # we could just use the merge method but I want to be sure
  # we dont merge in stuff that isnt actually valid
  ["identity", "fact", "agent", "cf_class", "compound"].each do |ftype|
    if filter.include?(ftype)
      custom_filter[ftype] = [filter[ftype], custom_filter[ftype]].flatten
    end
  end

  # ensure that all filters at least restrict the call to the agent we're a proxy for
  custom_filter["agent"] << @agent unless custom_filter["agent"].include?(@agent)
  custom_options[:filter] = custom_filter

  # Fake out the stats discovery would have put there
  @stats.discovered_agents([expected_agents].flatten)

  # Handle fire and forget requests
  #
  # If a specific reply-to was set then from the client perspective this should
  # be a fire and forget request too since no response will ever reach us - it
  # will go to the reply-to destination
  if args[:process_results] == false || @reply_to
    return fire_and_forget_request(action, args, custom_filter)
  end

  # Now do a call pretty much exactly like in method_missing except with our own
  # options and discovery magic
  if block_given?
    call_agent(action, args, custom_options, [expected_agents].flatten) do |r|
      block.call(r)
    end
  else
    call_agent(action, args, custom_options, [expected_agents].flatten)
  end
end

#disconnectObject

Disconnects cleanly from the middleware



107
108
109
# File 'lib/mcollective/rpc/client.rb', line 107

def disconnect
  @client.disconnect
end

#discover(flags = {}) ⇒ Object

Does discovery based on the filters set, if a discovery was previously done return that else do a new discovery.

Alternatively if identity filters are given and none of them are regular expressions then just use the provided data as discovered data, avoiding discovery

Discovery can be forced if direct_addressing is enabled by passing in an array of nodes with :nodes or JSON data like those produced by mcollective RPC JSON output using :json

Will show a message indicating its doing discovery if running verbose or if the :verbose flag is passed in.

Use reset to force a new discovery



393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
# File 'lib/mcollective/rpc/client.rb', line 393

def discover(flags={})
  flags.keys.each do |key|
    raise "Unknown option #{key} passed to discover" unless [:verbose, :hosts, :nodes, :json].include?(key)
  end

  flags.include?(:verbose) ? verbose = flags[:verbose] : verbose = @verbose

  verbose = false unless @output_format == :console

  # flags[:nodes] and flags[:hosts] are the same thing, we should never have
  # allowed :hosts as that was inconsistent with the established terminology
  flags[:nodes] = flags.delete(:hosts) if flags.include?(:hosts)

  reset if flags[:nodes] || flags[:json]

  unless @discovered_agents
    # if either hosts or JSON is supplied try to figure out discovery data from there
    # if direct_addressing is not enabled this is a critical error as the user might
    # not have supplied filters so raise an exception
    if flags[:nodes] || flags[:json]
      raise "Can only supply discovery data if direct_addressing is enabled" unless Config.instance.direct_addressing

      hosts = []

      if flags[:nodes]
        hosts = Helpers.extract_hosts_from_array(flags[:nodes])
      elsif flags[:json]
        hosts = Helpers.extract_hosts_from_json(flags[:json])
      end

      raise "Could not find any hosts in discovery data provided" if hosts.empty?

      @discovered_agents = hosts
      @force_direct_request = true

    # if an identity filter is supplied and it is all strings no regex we can use that
    # as discovery data, technically the identity filter is then redundant if we are
    # in direct addressing mode and we could empty it out but this use case should
    # only really be for a few -I's on the CLI
    #
    # For safety we leave the filter in place for now, that way we can support this
    # enhancement also in broadcast mode
    elsif options[:filter]["identity"].size > 0
      regex_filters = options[:filter]["identity"].select{|i| i.match("^\/")}.size

      if regex_filters == 0
        @discovered_agents = options[:filter]["identity"].clone
        @force_direct_request = true if Config.instance.direct_addressing
      end
    end
  end

  # All else fails we do it the hard way using a traditional broadcast
  unless @discovered_agents
    @stats.time_discovery :start

    @stderr.print("Determining the amount of hosts matching filter for #{discovery_timeout} seconds .... ") if verbose

    # if the requested limit is a pure number and not a percent
    # and if we're configured to use the first found hosts as the
    # limit method then pass in the limit thus minimizing the amount
    # of work we do in the discover phase and speeding it up significantly
    if @limit_method == :first and @limit_targets.is_a?(Fixnum)
      @discovered_agents = @client.discover(@filter, @discovery_timeout, @limit_targets)
    else
      @discovered_agents = @client.discover(@filter, @discovery_timeout)
    end

    @force_direct_request = false
    @stderr.puts(@discovered_agents.size) if verbose

    @stats.time_discovery :end
  end

  @stats.discovered_agents(@discovered_agents)
  RPC.discovered(@discovered_agents)

  @discovered_agents
end

#fact_filter(fact, value = nil, operator = "=") ⇒ Object

Sets the fact filter



330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
# File 'lib/mcollective/rpc/client.rb', line 330

def fact_filter(fact, value=nil, operator="=")
  return if fact.nil?
  return if fact == false

  if value.nil?
    parsed = Util.parse_fact_string(fact)
    @filter["fact"] << parsed unless parsed == false
  else
    parsed = Util.parse_fact_string("#{fact}#{operator}#{value}")
    @filter["fact"] << parsed unless parsed == false
  end

  @filter["fact"].compact!
  reset
end

#help(template) ⇒ Object

Returns help for an agent if a DDL was found



112
113
114
115
116
117
118
# File 'lib/mcollective/rpc/client.rb', line 112

def help(template)
  if @ddl
    @ddl.help(template)
  else
    return "Can't find DDL for agent '#{@agent}'"
  end
end

#identity_filter(identity) ⇒ Object

Sets the identity filter



354
355
356
357
358
# File 'lib/mcollective/rpc/client.rb', line 354

def identity_filter(identity)
  @filter["identity"] << identity
  @filter["identity"].compact!
  reset
end

#new_request(action, data) ⇒ Object

Creates a suitable request hash for the SimpleRPC agent.

You’d use this if you ever wanted to take care of sending requests on your own - perhaps via Client#sendreq if you didn’t care for responses.

In that case you can just do:

msg = your_rpc.new_request("some_action", :foo => :bar)
filter = your_rpc.filter

your_rpc.client.sendreq(msg, msg[:agent], filter)

This will send a SimpleRPC request to the action some_action with arguments :foo = :bar, it will return immediately and you will have no indication at all if the request was receieved or not

Clearly the use of this technique should be limited and done only if your code requires such a thing



139
140
141
142
143
144
145
146
147
148
# File 'lib/mcollective/rpc/client.rb', line 139

def new_request(action, data)
  callerid = PluginManager["security_plugin"].callerid

  raise 'callerid received from security plugin is not valid' unless PluginManager["security_plugin"].valid_callerid?(callerid)

  {:agent  => @agent,
   :action => action,
   :caller => callerid,
   :data   => data}
end

#optionsObject

Provides a normal options hash like you would get from Optionparser



475
476
477
478
479
480
481
482
483
484
# File 'lib/mcollective/rpc/client.rb', line 475

def options
  {:disctimeout => @discovery_timeout,
   :timeout => @timeout,
   :verbose => @verbose,
   :filter => @filter,
   :collective => @collective,
   :output_format => @output_format,
   :ttl => @ttl,
   :config => @config}
end

#resetObject

Resets various internal parts of the class, most importantly it clears out the cached discovery



368
369
370
# File 'lib/mcollective/rpc/client.rb', line 368

def reset
  @discovered_agents = nil
end

#reset_filterObject

Reet the filter to an empty one



373
374
375
376
# File 'lib/mcollective/rpc/client.rb', line 373

def reset_filter
  @filter = Util.empty_filter
  agent_filter @agent
end