Class: MCollective::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/mcollective/client.rb

Overview

Helpers for writing clients that can talk to agents, do discovery and so forth

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(configfile) ⇒ Client

Returns a new instance of Client.



6
7
8
9
10
11
12
13
14
15
16
17
18
19
# File 'lib/mcollective/client.rb', line 6

def initialize(configfile)
  @config = Config.instance
  @config.loadconfig(configfile) unless @config.configured

  @connection = PluginManager["connector_plugin"]
  @security = PluginManager["security_plugin"]

  @security.initiated_by = :client
  @options = nil
  @subscriptions = {}

  @discoverer = Discovery.new(self)
  @connection.connect
end

Instance Attribute Details

#discovererObject

Returns the value of attribute discoverer.



4
5
6
# File 'lib/mcollective/client.rb', line 4

def discoverer
  @discoverer
end

#optionsObject

Returns the value of attribute options.



4
5
6
# File 'lib/mcollective/client.rb', line 4

def options
  @options
end

#statsObject

Returns the value of attribute stats.



4
5
6
# File 'lib/mcollective/client.rb', line 4

def stats
  @stats
end

Instance Method Details

#collectiveObject

Returns the configured main collective if no specific collective is specified as options



23
24
25
26
27
28
29
# File 'lib/mcollective/client.rb', line 23

def collective
  if @options[:collective].nil?
    @config.main_collective
  else
    @options[:collective]
  end
end

#createreq(msg, agent, filter = {}) ⇒ Object



48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/mcollective/client.rb', line 48

def createreq(msg, agent, filter ={})
  if msg.is_a?(Message)
    request = msg
    agent = request.agent
  else
    ttl = @options[:ttl] || @config.ttl
    request = Message.new(msg, nil, {:agent => agent, :type => :request, :collective => collective, :filter => filter, :ttl => ttl})
    request.reply_to = @options[:reply_to] if @options[:reply_to]
  end

  request.encode!
  subscribe(agent, :reply) unless request.reply_to
  request
end

#disconnectObject

Disconnects cleanly from the middleware



32
33
34
35
# File 'lib/mcollective/client.rb', line 32

def disconnect
  Log.debug("Disconnecting from the middleware")
  @connection.disconnect
end

#discover(filter, timeout, limit = 0) ⇒ Object

Performs a discovery of nodes matching the filter passed returns an array of nodes

An integer limit can be supplied this will have the effect of the discovery being cancelled soon as it reached the requested limit of hosts



117
118
119
# File 'lib/mcollective/client.rb', line 117

def discover(filter, timeout, limit=0)
  discovered = @discoverer.discover(filter, timeout, limit)
end

#discovered_req(body, agent, options = false) ⇒ Object



235
236
237
# File 'lib/mcollective/client.rb', line 235

def discovered_req(body, agent, options=false)
  raise "Client#discovered_req has been removed, please port your agent and client to the SimpleRPC framework"
end

#display_stats(stats, options = false, caption = "stomp call summary") ⇒ Object

Prints out the stats returns from req and discovered_req in a nice way



240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
# File 'lib/mcollective/client.rb', line 240

def display_stats(stats, options=false, caption="stomp call summary")
  options = @options unless options

  if options[:verbose]
    puts("\n---- #{caption} ----")

    if stats[:discovered]
      puts("           Nodes: #{stats[:discovered]} / #{stats[:responses]}")
    else
      puts("           Nodes: #{stats[:responses]}")
    end

    printf("      Start Time: %s\n", Time.at(stats[:starttime]))
    printf("  Discovery Time: %.2fms\n", stats[:discoverytime] * 1000)
    printf("      Agent Time: %.2fms\n", stats[:blocktime] * 1000)
    printf("      Total Time: %.2fms\n", stats[:totaltime] * 1000)

  else
    if stats[:discovered]
      printf("\nFinished processing %d / %d hosts in %.2f ms\n\n", stats[:responses], stats[:discovered], stats[:blocktime] * 1000)
    else
      printf("\nFinished processing %d hosts in %.2f ms\n\n", stats[:responses], stats[:blocktime] * 1000)
    end
  end

  if stats[:noresponsefrom].size > 0
    puts("\nNo response from:\n")

    stats[:noresponsefrom].each do |c|
      puts if c % 4 == 1
      printf("%30s", c)
    end

    puts
  end
end

#receive(requestid = nil) ⇒ Object

Blocking call that waits for ever for a message to arrive.

If you give it a requestid this means you’ve previously send a request with that ID and now you just want replies that matches that id, in that case the current connection will just ignore all messages not directed at it and keep waiting for more till it finds a matching message.



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/mcollective/client.rb', line 88

def receive(requestid = nil)
  reply = nil

  begin
    reply = @connection.receive
    reply.type = :reply
    reply.expected_msgid = requestid

    reply.decode!
    unless reply.requestid == requestid
      raise(MsgDoesNotMatchRequestID, "Message reqid #{reply.requestid} does not match our reqid #{requestid}")
    end
  rescue SecurityValidationFailed => e
    Log.warn("Ignoring a message that did not pass security validations")
    retry
  rescue MsgDoesNotMatchRequestID => e
    Log.debug("Ignoring a message for some other client : #{e.message}")
    retry
  end

  reply
end

#req(body, agent = nil, options = false, waitfor = 0, &block) ⇒ Object

Send a request, performs the passed block for each response

times = req(“status”, “mcollectived”, options, client) {|resp|

pp resp

}

It returns a hash of times and timeouts for discovery and total run is taken from the options hash which in turn is generally built using MCollective::Optionparser



129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/mcollective/client.rb', line 129

def req(body, agent=nil, options=false, waitfor=0, &block)
  if body.is_a?(Message)
    agent = body.agent
    waitfor = body.discovered_hosts.size || 0
    @options = body.options
  end

  @options = options if options
  threaded = @options[:threaded]
  timeout = @discoverer.discovery_timeout(@options[:timeout], @options[:filter])
  request = createreq(body, agent, @options[:filter])
  publish_timeout = @options[:publish_timeout]
  stat = {:starttime => Time.now.to_f, :discoverytime => 0, :blocktime => 0, :totaltime => 0}
  STDOUT.sync = true
  hosts_responded = 0


  begin
    if threaded
      hosts_responded = threaded_req(request, publish_timeout, timeout, waitfor, &block)
    else
      hosts_responded = unthreaded_req(request, publish_timeout, timeout, waitfor, &block)
    end
  rescue Interrupt => e
  ensure
    unsubscribe(agent, :reply)
  end

  return update_stat(stat, hosts_responded, request.requestid)
end

#sendreq(msg, agent, filter = {}) ⇒ Object

Sends a request and returns the generated request id, doesn’t wait for responses and doesn’t execute any passed in code blocks for responses



39
40
41
42
43
44
45
46
# File 'lib/mcollective/client.rb', line 39

def sendreq(msg, agent, filter = {})
  request = createreq(msg, agent, filter)

  Log.debug("Sending request #{request.requestid} to the #{request.agent} agent with ttl #{request.ttl} in collective #{request.collective}")

  request.publish
  request.requestid
end

#start_publisher(request, publish_timeout) ⇒ Object

Starts the request publishing routine



191
192
193
194
195
196
197
198
199
200
201
# File 'lib/mcollective/client.rb', line 191

def start_publisher(request, publish_timeout)
  Log.debug("Starting publishing with publish timeout of #{publish_timeout}")
  begin
    Timeout.timeout(publish_timeout) do
      Log.debug("Sending request #{request.requestid} to the #{request.agent} agent with ttl #{request.ttl} in collective #{request.collective}")
      request.publish
    end
  rescue Timeout::Error => e
    Log.warn("Could not publish all messages. Publishing timed out.")
  end
end

#start_receiver(requestid, waitfor, timeout, &block) ⇒ Object

Starts the response receiver routine Expected to return the amount of received responses.



205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
# File 'lib/mcollective/client.rb', line 205

def start_receiver(requestid, waitfor, timeout, &block)
  Log.debug("Starting response receiver with timeout of #{timeout}")
  hosts_responded = 0
  begin
    Timeout.timeout(timeout) do
      begin
        resp = receive(requestid)
        yield resp.payload
        hosts_responded += 1
      end while (waitfor == 0 || hosts_responded < waitfor)
    end
  rescue Timeout::Error => e
    if (waitfor > hosts_responded)
      Log.warn("Could not receive all responses. Expected : #{waitfor}. Received : #{hosts_responded}")
    end
  end

  hosts_responded
end

#subscribe(agent, type) ⇒ Object



63
64
65
66
67
68
69
70
71
# File 'lib/mcollective/client.rb', line 63

def subscribe(agent, type)
  unless @subscriptions.include?(agent)
    subscription = Util.make_subscriptions(agent, type, collective)
    Log.debug("Subscribing to #{type} target for agent #{agent}")

    Util.subscribe(subscription)
    @subscriptions[agent] = 1
  end
end

#threaded_req(request, publish_timeout, timeout, waitfor, &block) ⇒ Object

Starts the client receiver and publisher in threads. This is activated when the ‘threader_client’ configuration option is set.



170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
# File 'lib/mcollective/client.rb', line 170

def threaded_req(request, publish_timeout, timeout, waitfor, &block)
  Log.debug("Starting threaded client")
  publisher = Thread.new do
    start_publisher(request, publish_timeout)
  end

  # When the client is threaded we add the publishing timeout to
  # the agent timeout so that the receiver doesn't time out before
  # publishing has finished in cases where publish_timeout >= timeout.
  total_timeout = publish_timeout + timeout
  hosts_responded = 0

  receiver = Thread.new do
    hosts_responded = start_receiver(request.requestid, waitfor, total_timeout, &block)
  end

  receiver.join
  hosts_responded
end

#unsubscribe(agent, type) ⇒ Object



73
74
75
76
77
78
79
80
81
# File 'lib/mcollective/client.rb', line 73

def unsubscribe(agent, type)
  if @subscriptions.include?(agent)
    subscription = Util.make_subscriptions(agent, type, collective)
    Log.debug("Unsubscribing #{type} target for #{agent}")

    Util.unsubscribe(subscription)
    @subscriptions.delete(agent)
  end
end

#unthreaded_req(request, publish_timeout, timeout, waitfor, &block) ⇒ Object

Starts the client receiver and publisher unthreaded. This is the default client behaviour.



162
163
164
165
# File 'lib/mcollective/client.rb', line 162

def unthreaded_req(request, publish_timeout, timeout, waitfor, &block)
  start_publisher(request, publish_timeout)
  start_receiver(request.requestid, waitfor, timeout, &block)
end

#update_stat(stat, hosts_responded, requestid) ⇒ Object



225
226
227
228
229
230
231
232
233
# File 'lib/mcollective/client.rb', line 225

def update_stat(stat, hosts_responded, requestid)
  stat[:totaltime] = Time.now.to_f - stat[:starttime]
  stat[:blocktime] = stat[:totaltime] - stat[:discoverytime]
  stat[:responses] = hosts_responded
  stat[:noresponsefrom] = []
  stat[:requestid] = requestid

  @stats = stat
end