Class: MCollective::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/mcollective/client.rb

Overview

Helpers for writing clients that can talk to agents, do discovery and so forth

Constant Summary collapse

@@request_sequence =
0

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ Client

Returns a new instance of Client.



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/mcollective/client.rb', line 6

def initialize(options)
  @config = Config.instance
  @options = nil

  if options.is_a?(String)
    # String is the path to a config file
    @config.loadconfig(options) unless @config.configured
  elsif options.is_a?(Hash)
    @config.loadconfig(options[:config]) unless @config.configured
    @options = options
    @connection_timeout = options[:connection_timeout]
  else
    raise "Invalid parameter passed to Client constructor. Valid types are Hash or String"
  end

  @connection_timeout ||= @config.connection_timeout

  @connection = PluginManager["connector_plugin"]
  @security = PluginManager["security_plugin"]

  @security.initiated_by = :client
  @subscriptions = {}

  @discoverer = Discovery.new(self)

  # Time box the connection if a timeout has been specified
  # connection_timeout defaults to nil which means it will try forever if
  # not specified
  begin
    Timeout::timeout(@connection_timeout, ClientTimeoutError) do
      @connection.connect
    end
  rescue ClientTimeoutError => e
    Log.error("Timeout occured while trying to connect to middleware")
    raise e
  end
end

Instance Attribute Details

#connection_timeoutObject

Returns the value of attribute connection_timeout.



4
5
6
# File 'lib/mcollective/client.rb', line 4

def connection_timeout
  @connection_timeout
end

#discovererObject

Returns the value of attribute discoverer.



4
5
6
# File 'lib/mcollective/client.rb', line 4

def discoverer
  @discoverer
end

#optionsObject

Returns the value of attribute options.



4
5
6
# File 'lib/mcollective/client.rb', line 4

def options
  @options
end

#statsObject

Returns the value of attribute stats.



4
5
6
# File 'lib/mcollective/client.rb', line 4

def stats
  @stats
end

Class Method Details

.request_sequenceObject



45
46
47
# File 'lib/mcollective/client.rb', line 45

def self.request_sequence
  @@request_sequence
end

Instance Method Details

#collectiveObject

Returns the configured main collective if no specific collective is specified as options



51
52
53
54
55
56
57
# File 'lib/mcollective/client.rb', line 51

def collective
  if @options[:collective].nil?
    @config.main_collective
  else
    @options[:collective]
  end
end

#createreq(msg, agent, filter = {}) ⇒ Object



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/mcollective/client.rb', line 73

def createreq(msg, agent, filter ={})
  if msg.is_a?(Message)
    request = msg
    agent = request.agent
  else
    ttl = @options[:ttl] || @config.ttl
    request = Message.new(msg, nil, {:agent => agent, :type => :request, :collective => collective, :filter => filter, :ttl => ttl})
    request.reply_to = @options[:reply_to] if @options[:reply_to]
  end

  @@request_sequence += 1

  request.encode!
  subscribe(agent, :reply) unless request.reply_to
  request
end

#disconnectObject

Disconnects cleanly from the middleware



60
61
62
63
# File 'lib/mcollective/client.rb', line 60

def disconnect
  Log.debug("Disconnecting from the middleware")
  @connection.disconnect
end

#discover(filter, timeout, limit = 0) ⇒ Object

Performs a discovery of nodes matching the filter passed returns an array of nodes

An integer limit can be supplied this will have the effect of the discovery being cancelled soon as it reached the requested limit of hosts



147
148
149
# File 'lib/mcollective/client.rb', line 147

def discover(filter, timeout, limit=0)
  @discoverer.discover(filter.merge({'collective' => collective}), timeout, limit)
end

#discovered_req(body, agent, options = false) ⇒ Object



300
301
302
# File 'lib/mcollective/client.rb', line 300

def discovered_req(body, agent, options=false)
  raise "Client#discovered_req has been removed, please port your agent and client to the SimpleRPC framework"
end

#display_stats(stats, options = false, caption = "stomp call summary") ⇒ Object

Prints out the stats returns from req and discovered_req in a nice way



305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
# File 'lib/mcollective/client.rb', line 305

def display_stats(stats, options=false, caption="stomp call summary")
  options = @options unless options

  if options[:verbose]
    puts("\n---- #{caption} ----")

    if stats[:discovered]
      puts("           Nodes: #{stats[:discovered]} / #{stats[:responses]}")
    else
      puts("           Nodes: #{stats[:responses]}")
    end

    printf("      Start Time: %s\n", Time.at(stats[:starttime]))
    printf("  Discovery Time: %.2fms\n", stats[:discoverytime] * 1000)
    printf("      Agent Time: %.2fms\n", stats[:blocktime] * 1000)
    printf("      Total Time: %.2fms\n", stats[:totaltime] * 1000)

  else
    if stats[:discovered]
      printf("\nFinished processing %d / %d hosts in %.2f ms\n\n", stats[:responses], stats[:discovered], stats[:blocktime] * 1000)
    else
      printf("\nFinished processing %d hosts in %.2f ms\n\n", stats[:responses], stats[:blocktime] * 1000)
    end
  end

  if stats[:noresponsefrom].size > 0
    puts("\nNo response from:\n")

    stats[:noresponsefrom].each do |c|
      puts if c % 4 == 1
      printf("%30s", c)
    end

    puts
  end

  if stats[:unexpectedresponsefrom].size > 0
    puts("\nUnexpected response from:\n")

    stats[:unexpectedresponsefrom].each do |c|
      puts if c % 4 == 1
      printf("%30s", c)
    end

    puts
  end
end

#publish(request) ⇒ Object



231
232
233
234
# File 'lib/mcollective/client.rb', line 231

def publish(request)
  Log.info("Sending request #{request.requestid} for agent '#{request.agent}' with ttl #{request.ttl} in collective '#{request.collective}'")
  request.publish
end

#receive(requestid = nil) ⇒ Object

Blocking call that waits for ever for a message to arrive.

If you give it a requestid this means you’ve previously send a request with that ID and now you just want replies that matches that id, in that case the current connection will just ignore all messages not directed at it and keep waiting for more till it finds a matching message.



115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/mcollective/client.rb', line 115

def receive(requestid = nil)
  reply = nil

  begin
    reply = @connection.receive
    reply.type = :reply
    reply.expected_msgid = requestid

    reply.decode!

    unless reply.requestid == requestid
      raise(MsgDoesNotMatchRequestID, "Message reqid #{reply.requestid} does not match our reqid #{requestid}")
    end

    Log.debug("Received reply to #{reply.requestid} from #{reply.payload[:senderid]}")
  rescue SecurityValidationFailed => e
    Log.warn("Ignoring a message that did not pass security validations")
    retry
  rescue MsgDoesNotMatchRequestID => e
    Log.debug("Ignoring a message for some other client : #{e.message}")
    retry
  end

  reply
end

#req(body, agent = nil, options = false, waitfor = [], &block) ⇒ Object

Send a request, performs the passed block for each response

times = req(“status”, “mcollectived”, options, client) {|resp|

pp resp

}

It returns a hash of times and timeouts for discovery and total run is taken from the options hash which in turn is generally built using MCollective::Optionparser



159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/mcollective/client.rb', line 159

def req(body, agent=nil, options=false, waitfor=[], &block)
  if body.is_a?(Message)
    agent = body.agent
    waitfor = body.discovered_hosts || []
    @options = body.options
  end

  @options = options if options
  threaded = @options[:threaded]
  timeout = @discoverer.discovery_timeout(@options[:timeout], @options[:filter])
  request = createreq(body, agent, @options[:filter])
  publish_timeout = @options[:publish_timeout] || @config.publish_timeout
  stat = {:starttime => Time.now.to_f, :discoverytime => 0, :blocktime => 0, :totaltime => 0}
  STDOUT.sync = true
  hosts_responded = 0

  begin
    if threaded
      hosts_responded = threaded_req(request, publish_timeout, timeout, waitfor, &block)
    else
      hosts_responded = unthreaded_req(request, publish_timeout, timeout, waitfor, &block)
    end
  rescue Interrupt => e
  ensure
    unsubscribe(agent, :reply)
  end

  return update_stat(stat, hosts_responded, request.requestid)
end

#sendreq(msg, agent, filter = {}) ⇒ Object

Sends a request and returns the generated request id, doesn’t wait for responses and doesn’t execute any passed in code blocks for responses



67
68
69
70
71
# File 'lib/mcollective/client.rb', line 67

def sendreq(msg, agent, filter = {})
  request = createreq(msg, agent, filter)
  publish(request)
  request.requestid
end

#start_publisher(request, publish_timeout) ⇒ Object

Starts the request publishing routine



220
221
222
223
224
225
226
227
228
229
# File 'lib/mcollective/client.rb', line 220

def start_publisher(request, publish_timeout)
  Log.debug("Starting publishing with publish timeout of #{publish_timeout}")
  begin
    Timeout.timeout(publish_timeout) do
      publish(request)
    end
  rescue Timeout::Error => e
    Log.warn("Could not publish all messages. Publishing timed out.")
  end
end

#start_receiver(requestid, waitfor, timeout, &block) ⇒ Object

Starts the response receiver routine Expected to return the amount of received responses.



238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
# File 'lib/mcollective/client.rb', line 238

def start_receiver(requestid, waitfor, timeout, &block)
  Log.debug("Starting response receiver with timeout of #{timeout}")
  hosts_responded = 0

  if (waitfor.is_a?(Array))
    unfinished = Hash.new(0)
    waitfor.each {|w| unfinished[w] += 1}
  else
    unfinished = []
  end

  begin
    Timeout.timeout(timeout) do
      loop do
        resp = receive(requestid)

        if block.arity == 2
          yield resp.payload, resp
        else
          yield resp.payload
        end

        hosts_responded += 1

        if (waitfor.is_a?(Array))
          sender = resp.payload[:senderid]
          if unfinished[sender] <= 1
            unfinished.delete(sender)
          else
            unfinished[sender] -= 1
          end

          break if !waitfor.empty? && unfinished.empty?
        else
          break unless waitfor == 0 || hosts_responded < waitfor
        end
      end
    end
  rescue Timeout::Error => e
    if waitfor.is_a?(Array)
      if !unfinished.empty?
        Log.warn("Could not receive all responses. Did not receive responses from #{unfinished.keys.join(', ')}")
      end
    elsif (waitfor > hosts_responded)
      Log.warn("Could not receive all responses. Expected : #{waitfor}. Received : #{hosts_responded}")
    end
  end

  hosts_responded
end

#subscribe(agent, type) ⇒ Object



90
91
92
93
94
95
96
97
98
# File 'lib/mcollective/client.rb', line 90

def subscribe(agent, type)
  unless @subscriptions.include?(agent)
    subscription = Util.make_subscriptions(agent, type, collective)
    Log.debug("Subscribing to #{type} target for agent #{agent}")

    Util.subscribe(subscription)
    @subscriptions[agent] = 1
  end
end

#threaded_req(request, publish_timeout, timeout, waitfor, &block) ⇒ Object

Starts the client receiver and publisher in threads. This is activated when the ‘threader_client’ configuration option is set.



199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
# File 'lib/mcollective/client.rb', line 199

def threaded_req(request, publish_timeout, timeout, waitfor, &block)
  Log.debug("Starting threaded client")
  publisher = Thread.new do
    start_publisher(request, publish_timeout)
  end

  # When the client is threaded we add the publishing timeout to
  # the agent timeout so that the receiver doesn't time out before
  # publishing has finished in cases where publish_timeout >= timeout.
  total_timeout = publish_timeout + timeout
  hosts_responded = 0

  receiver = Thread.new do
    hosts_responded = start_receiver(request.requestid, waitfor, total_timeout, &block)
  end

  receiver.join
  hosts_responded
end

#unsubscribe(agent, type) ⇒ Object



100
101
102
103
104
105
106
107
108
# File 'lib/mcollective/client.rb', line 100

def unsubscribe(agent, type)
  if @subscriptions.include?(agent)
    subscription = Util.make_subscriptions(agent, type, collective)
    Log.debug("Unsubscribing #{type} target for #{agent}")

    Util.unsubscribe(subscription)
    @subscriptions.delete(agent)
  end
end

#unthreaded_req(request, publish_timeout, timeout, waitfor, &block) ⇒ Object

Starts the client receiver and publisher unthreaded. This is the default client behaviour.



191
192
193
194
# File 'lib/mcollective/client.rb', line 191

def unthreaded_req(request, publish_timeout, timeout, waitfor, &block)
  start_publisher(request, publish_timeout)
  start_receiver(request.requestid, waitfor, timeout, &block)
end

#update_stat(stat, hosts_responded, requestid) ⇒ Object



289
290
291
292
293
294
295
296
297
298
# File 'lib/mcollective/client.rb', line 289

def update_stat(stat, hosts_responded, requestid)
  stat[:totaltime] = Time.now.to_f - stat[:starttime]
  stat[:blocktime] = stat[:totaltime] - stat[:discoverytime]
  stat[:responses] = hosts_responded
  stat[:noresponsefrom] = []
  stat[:unexpectedresponsefrom] = []
  stat[:requestid] = requestid

  @stats = stat
end