Class: MCollective::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/mcollective/client.rb

Overview

Helpers for writing clients that can talk to agents, do discovery and so forth

Constant Summary collapse

@@request_sequence =

rubocop:disable Style/ClassVars

0

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ Client

Returns a new instance of Client.



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/mcollective/client.rb', line 6

def initialize(options)
  @config = Config.instance
  @options = nil

  case options
  when String
    # String is the path to a config file
    @config.loadconfig(options) unless @config.configured
  when Hash
    @config.loadconfig(options[:config]) unless @config.configured
    @options = options
    @connection_timeout = options[:connection_timeout]
    @config.federations = options[:federations] if options[:federations]
  else
    raise "Invalid parameter passed to Client constructor. Valid types are Hash or String"
  end

  @connection_timeout ||= @config.connection_timeout

  @connection = PluginManager["connector_plugin"]
  @security = PluginManager["security_plugin"]

  @security.initiated_by = :client
  @subscriptions = {}

  @discoverer = Discovery.new(self)

  # Time box the connection if a timeout has been specified
  # connection_timeout defaults to nil which means it will try forever if
  # not specified
  begin
    Timeout.timeout(@connection_timeout, ClientTimeoutError) do
      @connection.connect
    end
  rescue ClientTimeoutError => e
    Log.error("Timeout occured while trying to connect to middleware")
    raise e
  end
end

Instance Attribute Details

#connection_timeoutObject

Returns the value of attribute connection_timeout.



4
5
6
# File 'lib/mcollective/client.rb', line 4

def connection_timeout
  @connection_timeout
end

#discovererObject

Returns the value of attribute discoverer.



4
5
6
# File 'lib/mcollective/client.rb', line 4

def discoverer
  @discoverer
end

#optionsObject

Returns the value of attribute options.



4
5
6
# File 'lib/mcollective/client.rb', line 4

def options
  @options
end

#statsObject

Returns the value of attribute stats.



4
5
6
# File 'lib/mcollective/client.rb', line 4

def stats
  @stats
end

Class Method Details

.request_sequenceObject



52
53
54
# File 'lib/mcollective/client.rb', line 52

def self.request_sequence
  @@request_sequence
end

.reset_request_sequenceObject



48
49
50
# File 'lib/mcollective/client.rb', line 48

def self.reset_request_sequence
  @@request_sequence = 0 # rubocop:disable Style/ClassVars
end

Instance Method Details

#collectiveObject

Returns the configured main collective if no specific collective is specified as options



58
59
60
61
62
63
64
# File 'lib/mcollective/client.rb', line 58

def collective
  if @options[:collective].nil?
    @config.main_collective
  else
    @options[:collective]
  end
end

#createreq(msg, agent, filter = {}) ⇒ Object



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/mcollective/client.rb', line 80

def createreq(msg, agent, filter={})
  if msg.is_a?(Message)
    request = msg
    agent = request.agent
  else
    ttl = @options[:ttl] || @config.ttl
    request = Message.new(msg, nil, :agent => agent, :type => :request, :collective => collective, :filter => filter, :ttl => ttl)
    request.reply_to = @options[:reply_to] if @options[:reply_to]
  end

  @@request_sequence += 1 # rubocop:disable Style/ClassVars

  request.encode!
  subscribe(agent, :reply) unless request.reply_to
  request
end

#disconnectObject

Disconnects cleanly from the middleware



67
68
69
70
# File 'lib/mcollective/client.rb', line 67

def disconnect
  Log.debug("Disconnecting from the middleware")
  @connection.disconnect
end

#discover(filter, timeout, limit = 0) ⇒ Object

Performs a discovery of nodes matching the filter passed returns an array of nodes

An integer limit can be supplied this will have the effect of the discovery being cancelled soon as it reached the requested limit of hosts



153
154
155
# File 'lib/mcollective/client.rb', line 153

def discover(filter, timeout, limit=0)
  @discoverer.discover(filter.merge("collective" => collective), timeout, limit, self)
end

#discovered_req(body, agent, options = false) ⇒ Object



304
305
306
# File 'lib/mcollective/client.rb', line 304

def discovered_req(body, agent, options=false)
  raise "Client#discovered_req has been removed, please port your agent and client to the SimpleRPC framework"
end

#display_stats(stats, options = false, caption = "stomp call summary") ⇒ Object

Prints out the stats returns from req and discovered_req in a nice way



309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
# File 'lib/mcollective/client.rb', line 309

def display_stats(stats, options=false, caption="stomp call summary")
  options ||= @options

  if options[:verbose]
    puts("\n---- #{caption} ----")

    if stats[:discovered]
      puts("           Nodes: #{stats[:discovered]} / #{stats[:responses]}")
    else
      puts("           Nodes: #{stats[:responses]}")
    end

    printf("      Start Time: %s\n", Time.at(stats[:starttime]))
    printf("  Discovery Time: %.2fms\n", stats[:discoverytime] * 1000)
    printf("      Agent Time: %.2fms\n", stats[:blocktime] * 1000)
    printf("      Total Time: %.2fms\n", stats[:totaltime] * 1000)

  elsif stats[:discovered]
    printf("\nFinished processing %d / %d hosts in %.2f ms\n\n", stats[:responses], stats[:discovered], stats[:blocktime] * 1000)
  else
    printf("\nFinished processing %d hosts in %.2f ms\n\n", stats[:responses], stats[:blocktime] * 1000)
  end

  unless stats[:noresponsefrom].empty?
    puts("\nNo response from:\n")

    stats[:noresponsefrom].each do |c|
      puts if c % 4 == 1
      printf("%30s", c)
    end

    puts
  end

  unless stats[:unexpectedresponsefrom].empty?
    puts("\nUnexpected response from:\n")

    stats[:unexpectedresponsefrom].each do |c|
      puts if c % 4 == 1
      printf("%30s", c)
    end

    puts
  end
end

#publish(request) ⇒ Object



237
238
239
240
# File 'lib/mcollective/client.rb', line 237

def publish(request)
  Log.info("Sending request #{request.requestid} for agent '#{request.agent}' with ttl #{request.ttl} in collective '#{request.collective}'")
  request.publish
end

#receive(requestid = nil) ⇒ Object

Blocking call that waits for ever for a message to arrive.

If you give it a requestid this means you’ve previously send a request with that ID and now you just want replies that matches that id, in that case the current connection will just ignore all messages not directed at it and keep waiting for more till it finds a matching message.



123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/mcollective/client.rb', line 123

def receive(requestid=nil)
  reply = nil

  begin
    reply = @connection.receive
    reply.type = :reply
    reply.expected_msgid = requestid

    reply.decode!

    raise(MsgDoesNotMatchRequestID, "Message reqid #{reply.requestid} does not match our reqid #{requestid}") unless reply.requestid == requestid

    Log.debug("Received reply to #{reply.requestid} from #{reply.payload[:senderid]}")
  rescue SecurityValidationFailed => e
    Log.warn("Ignoring a message that did not pass security validations")
    retry
  rescue MsgDoesNotMatchRequestID => e
    Log.debug("Ignoring a message for some other client : #{e.message}")
    retry
  end

  reply
end

#req(body, agent = nil, options = false, waitfor = [], &block) ⇒ Object

Send a request, performs the passed block for each response

times = req(“status”, “mcollectived”, options, client) {|resp|

pp resp

}

It returns a hash of times and timeouts for discovery and total run is taken from the options hash which in turn is generally built using MCollective::Optionparser



165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
# File 'lib/mcollective/client.rb', line 165

def req(body, agent=nil, options=false, waitfor=[], &block)
  if body.is_a?(Message)
    agent = body.agent
    waitfor = body.discovered_hosts || []
    @options = body.options
  end

  @options = options if options
  threaded = @options[:threaded]
  timeout = @discoverer.discovery_timeout(@options[:timeout], @options[:filter])
  request = createreq(body, agent, @options[:filter])
  publish_timeout = @options[:publish_timeout] || @config.publish_timeout
  stat = {:starttime => Time.now.to_f, :discoverytime => 0, :blocktime => 0, :totaltime => 0}
  $stdout.sync = true
  hosts_responded = 0

  begin
    if threaded
      hosts_responded = threaded_req(request, publish_timeout, timeout, waitfor, &block)
    else
      hosts_responded = unthreaded_req(request, publish_timeout, timeout, waitfor, &block)
    end
  rescue Interrupt # rubocop:disable Lint/SuppressedException
  ensure
    unsubscribe(agent, :reply)
  end

  update_stat(stat, hosts_responded, request.requestid)
end

#sendreq(msg, agent, filter = {}) ⇒ Object

Sends a request and returns the generated request id, doesn’t wait for responses and doesn’t execute any passed in code blocks for responses



74
75
76
77
78
# File 'lib/mcollective/client.rb', line 74

def sendreq(msg, agent, filter={})
  request = createreq(msg, agent, filter)
  publish(request)
  request.requestid
end

#start_publisher(request, publish_timeout) ⇒ Object

Starts the request publishing routine



226
227
228
229
230
231
232
233
234
235
# File 'lib/mcollective/client.rb', line 226

def start_publisher(request, publish_timeout)
  Log.debug("Starting publishing with publish timeout of #{publish_timeout}")
  begin
    Timeout.timeout(publish_timeout) do
      publish(request)
    end
  rescue Timeout::Error
    Log.warn("Could not publish all messages. Publishing timed out.")
  end
end

#start_receiver(requestid, waitfor, timeout, &block) ⇒ Object

Starts the response receiver routine Expected to return the amount of received responses.



244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
# File 'lib/mcollective/client.rb', line 244

def start_receiver(requestid, waitfor, timeout, &block)
  Log.debug("Starting response receiver with timeout of #{timeout}")
  hosts_responded = 0

  if waitfor.is_a?(Array)
    unfinished = Hash.new(0)
    waitfor.each {|w| unfinished[w] += 1}
  else
    unfinished = []
  end

  begin
    Timeout.timeout(timeout) do
      loop do
        resp = receive(requestid)

        if block.arity == 2
          yield resp.payload, resp
        else
          yield resp.payload
        end

        hosts_responded += 1

        if waitfor.is_a?(Array)
          sender = resp.payload[:senderid]
          if unfinished[sender] <= 1
            unfinished.delete(sender)
          else
            unfinished[sender] -= 1
          end

          break if !waitfor.empty? && unfinished.empty?
        else
          break unless waitfor == 0 || hosts_responded < waitfor
        end
      end
    end
  rescue Timeout::Error
    if waitfor.is_a?(Array)
      Log.warn("Could not receive all responses. Did not receive responses from #{unfinished.keys.join(', ')}") unless unfinished.empty?
    elsif waitfor > hosts_responded
      Log.warn("Could not receive all responses. Expected : #{waitfor}. Received : #{hosts_responded}")
    end
  end

  hosts_responded
end

#subscribe(agent, type) ⇒ Object



97
98
99
100
101
102
103
104
105
# File 'lib/mcollective/client.rb', line 97

def subscribe(agent, type)
  unless @subscriptions.include?(agent)
    subscription = Util.make_subscriptions(agent, type, collective)
    Log.debug("Subscribing to #{type} target for agent #{agent}")

    Util.subscribe(subscription)
    @subscriptions[agent] = 1
  end
end

#threaded_req(request, publish_timeout, timeout, waitfor, &block) ⇒ Object

Starts the client receiver and publisher in threads. This is activated when the ‘threader_client’ configuration option is set.



205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
# File 'lib/mcollective/client.rb', line 205

def threaded_req(request, publish_timeout, timeout, waitfor, &block)
  Log.debug("Starting threaded client")
  Thread.new do
    start_publisher(request, publish_timeout)
  end

  # When the client is threaded we add the publishing timeout to
  # the agent timeout so that the receiver doesn't time out before
  # publishing has finished in cases where publish_timeout >= timeout.
  total_timeout = publish_timeout + timeout
  hosts_responded = 0

  receiver = Thread.new do
    hosts_responded = start_receiver(request.requestid, waitfor, total_timeout, &block)
  end

  receiver.join
  hosts_responded
end

#unsubscribe(agent, type) ⇒ Object



107
108
109
110
111
112
113
114
115
# File 'lib/mcollective/client.rb', line 107

def unsubscribe(agent, type)
  if @subscriptions.include?(agent)
    subscription = Util.make_subscriptions(agent, type, collective)
    Log.debug("Unsubscribing #{type} target for #{agent}")

    Util.unsubscribe(subscription)
    @subscriptions.delete(agent)
  end
end

#unthreaded_req(request, publish_timeout, timeout, waitfor, &block) ⇒ Object

Starts the client receiver and publisher unthreaded. This is the default client behaviour.



197
198
199
200
# File 'lib/mcollective/client.rb', line 197

def unthreaded_req(request, publish_timeout, timeout, waitfor, &block)
  start_publisher(request, publish_timeout)
  start_receiver(request.requestid, waitfor, timeout, &block)
end

#update_stat(stat, hosts_responded, requestid) ⇒ Object



293
294
295
296
297
298
299
300
301
302
# File 'lib/mcollective/client.rb', line 293

def update_stat(stat, hosts_responded, requestid)
  stat[:totaltime] = Time.now.to_f - stat[:starttime]
  stat[:blocktime] = stat[:totaltime] - stat[:discoverytime]
  stat[:responses] = hosts_responded
  stat[:noresponsefrom] = []
  stat[:unexpectedresponsefrom] = []
  stat[:requestid] = requestid

  @stats = stat
end