Class: MCollective::Connector::Rabbitmq

Inherits:
Base
  • Object
show all
Defined in:
lib/mcollective/connector/rabbitmq.rb

Defined Under Namespace

Classes: EventLogger

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Base

inherited

Constructor Details

#initializeRabbitmq

Returns a new instance of Rabbitmq.



96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/mcollective/connector/rabbitmq.rb', line 96

def initialize
  @config = Config.instance
  @subscriptions = []
  @base64 = false
  @use_exponential_back_off = get_bool_option("rabbitmq.use_exponential_back_off", "true")
  @initial_reconnect_delay = Float(get_option("rabbitmq.initial_reconnect_delay", 0.01))
  @back_off_multiplier = Integer(get_option("rabbitmq.back_off_multiplier", 2))
  @max_reconnect_delay = Float(get_option("rabbitmq.max_reconnect_delay", 30.0))
  @reconnect_delay = @initial_reconnect_delay

  Log.info("RabbitMQ connector initialized.  Using stomp-gem #{stomp_version}")
end

Instance Attribute Details

#connectionObject (readonly)

Returns the value of attribute connection.



6
7
8
# File 'lib/mcollective/connector/rabbitmq.rb', line 6

def connection
  @connection
end

Instance Method Details

#connect(connector = ::Stomp::Connection) ⇒ Object

Connects to the RabbitMQ middleware



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
# File 'lib/mcollective/connector/rabbitmq.rb', line 110

def connect(connector = ::Stomp::Connection)
  if @connection
    Log.debug("Already connection, not re-initializing connection")
    return
  end

  begin
    @base64 = get_bool_option("rabbitmq.base64", "false")

    pools = Integer(get_option("rabbitmq.pool.size"))
    hosts = []
    middleware_user = ''
    middleware_password = ''
    prompt_for_username = get_bool_option("rabbitmq.prompt_user", "false")
    prompt_for_password = get_bool_option("rabbitmq.prompt_password", "false")
    if prompt_for_username
      Log.debug("No previous user exists and rabbitmq.prompt-user is set to true")
      print "Please enter user to connect to middleware: "
      middleware_user = STDIN.gets.chomp
    end

    if prompt_for_password
      Log.debug("No previous password exists and rabbitmq.prompt-password is set to true")
      middleware_password = MCollective::Util.get_hidden_input("Please enter password: ")
      print "\n"
    end
    
    1.upto(pools) do |poolnum|
      host = {}

      host[:host] = get_option("rabbitmq.pool.#{poolnum}.host")
      host[:port] = get_option("rabbitmq.pool.#{poolnum}.port", 61613).to_i
      host[:ssl] = get_bool_option("rabbitmq.pool.#{poolnum}.ssl", "false")
      
      # read user from config file
      host[:login] = get_env_or_option("STOMP_USER", "rabbitmq.pool.#{poolnum}.user", middleware_user)
      if prompt_for_username and host[:login] != middleware_user
        Log.info("Using #{host[:login]} from config file to connect to #{host[:host]}. "+
                  "plugin.rabbitmq.prompt_user should be set to false to remove the prompt.")
      end
      
      # read password from config file
      host[:passcode] = get_env_or_option("STOMP_PASSWORD", "rabbitmq.pool.#{poolnum}.password", middleware_password)
      if prompt_for_password and host[:passcode] != middleware_password
          Log.info("Using password from config file to connect to #{host[:host]}. "+
                  "plugin.rabbitmq.prompt_password should be set to false to remove the prompt.")
      end
     
      # if ssl is enabled set :ssl to the hash of parameters
      if host[:ssl]
        host[:ssl] = ssl_parameters(poolnum, get_bool_option("rabbitmq.pool.#{poolnum}.ssl.fallback", "false"))
      end

      Log.debug("Adding #{host[:host]}:#{host[:port]} to the connection pool")
      hosts << host
    end

    raise "No hosts found for the RabbitMQ connection pool" if hosts.size == 0

    connection = {:hosts => hosts}

    # Various STOMP gem options, defaults here matches defaults for 1.1.6 the meaning of
    # these can be guessed, the documentation isn't clear
    connection[:use_exponential_back_off] = @use_exponential_back_off
    connection[:initial_reconnect_delay] = @initial_reconnect_delay
    connection[:back_off_multiplier] = @back_off_multiplier
    connection[:max_reconnect_delay] = @max_reconnect_delay
    connection[:max_reconnect_attempts] = Integer(get_option("rabbitmq.max_reconnect_attempts", 0))
    connection[:randomize] = get_bool_option("rabbitmq.randomize", "false")
    connection[:backup] = get_bool_option("rabbitmq.backup", "false")

    connection[:timeout] = Integer(get_option("rabbitmq.timeout", -1))
    connection[:connect_timeout] = Integer(get_option("rabbitmq.connect_timeout", 30))
    connection[:reliable] = true
    connection[:max_hbrlck_fails] = Integer(get_option("rabbitmq.max_hbrlck_fails", 0))
    connection[:max_hbread_fails] = Integer(get_option("rabbitmq.max_hbread_fails", 2))

    connection[:connect_headers] = connection_headers

    connection[:logger] = EventLogger.new

    @connection = connector.new(connection)

  rescue ClientTimeoutError => e
    raise e
  rescue Exception => e
    raise("Could not connect to RabbitMQ Server: #{e}")
  end
end

#connection_headersObject



200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
# File 'lib/mcollective/connector/rabbitmq.rb', line 200

def connection_headers
  headers = {:"accept-version" => "1.0"}

  heartbeat_interval = Integer(get_option("rabbitmq.heartbeat_interval", 0))
  stomp_1_0_fallback = get_bool_option("rabbitmq.stomp_1_0_fallback", true)

  headers[:host] = get_option("rabbitmq.vhost", "/")

  if heartbeat_interval > 0
    unless stomp_version_supports_heartbeat?
      raise("Setting STOMP 1.1 properties like heartbeat intervals require at least version 1.2.10 of the STOMP gem")
    end

    if heartbeat_interval < 30
      Log.warn("Connection heartbeat is set to %d, forcing to minimum value of 30s")
      heartbeat_interval = 30
    end

    heartbeat_interval = heartbeat_interval * 1000
    headers[:"heart-beat"] = "%d,%d" % [heartbeat_interval + 500, heartbeat_interval - 500]

    if stomp_1_0_fallback
      headers[:"accept-version"] = "1.1,1.0"
    else
      headers[:"accept-version"] = "1.1"
    end
  else
    if stomp_version_supports_heartbeat?
      Log.info("Connecting without STOMP 1.1 heartbeats, consider setting plugin.rabbitmq.heartbeat_interval")
    end
  end

  headers
end

#disconnectObject

Disconnects from the RabbitMQ connection



481
482
483
484
485
# File 'lib/mcollective/connector/rabbitmq.rb', line 481

def disconnect
  Log.debug("Disconnecting from RabbitMQ")
  @connection.disconnect
  @connection = nil
end

#exponential_back_offObject

Calculate the exponential backoff needed



294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
# File 'lib/mcollective/connector/rabbitmq.rb', line 294

def exponential_back_off
  if !@use_exponential_back_off
    return nil
  end

  backoff = @reconnect_delay

  # calculate next delay
  @reconnect_delay = @reconnect_delay * @back_off_multiplier

  # cap at max reconnect delay
  if @reconnect_delay > @max_reconnect_delay
    @reconnect_delay = @max_reconnect_delay
  end

  return backoff
end

#get_bool_option(val, default) ⇒ Object

looks up a boolean value in the config



510
511
512
# File 'lib/mcollective/connector/rabbitmq.rb', line 510

def get_bool_option(val, default)
  Util.str_to_bool(@config.pluginconf.fetch(val, default))
end

#get_cert_file(poolnum) ⇒ Object

Returns the name of the certificate file used by RabbitMQ Will first check if an environment variable MCOLLECTIVE_RABBITMQ_POOLX_SSL_CERT exists, where X is the RabbitMQ pool number. If the environment variable doesn’t exist, it will try and load the value from the config.



289
290
291
# File 'lib/mcollective/connector/rabbitmq.rb', line 289

def get_cert_file(poolnum)
  ENV["MCOLLECTIVE_RABBITMQ_POOL%s_SSL_CERT" % poolnum] || get_option("rabbitmq.pool.#{poolnum}.ssl.cert", false)
end

#get_env_or_option(env, opt, default = nil) ⇒ Object

looks in the environment first then in the config file for a specific option, accepts an optional default.

raises an exception when it cant find a value anywhere



491
492
493
494
495
496
497
# File 'lib/mcollective/connector/rabbitmq.rb', line 491

def get_env_or_option(env, opt, default=nil)
  return ENV[env] if ENV.include?(env)
  return @config.pluginconf[opt] if @config.pluginconf.include?(opt)
  return default if default

  raise("No #{env} environment or plugin.#{opt} configuration option given")
end

#get_key_file(poolnum) ⇒ Object

Returns the name of the private key file used by RabbitMQ Will first check if an environment variable MCOLLECTIVE_RABBITMQ_POOLX_SSL_KEY exists, where X is the RabbitMQ pool number. If the environment variable doesn’t exist, it will try and load the value from the config.



281
282
283
# File 'lib/mcollective/connector/rabbitmq.rb', line 281

def get_key_file(poolnum)
  ENV["MCOLLECTIVE_RABBITMQ_POOL%s_SSL_KEY" % poolnum] || get_option("rabbitmq.pool.#{poolnum}.ssl.key", false)
end

#get_option(opt, default = nil) ⇒ Object

looks for a config option, accepts an optional default

raises an exception when it cant find a value anywhere



502
503
504
505
506
507
# File 'lib/mcollective/connector/rabbitmq.rb', line 502

def get_option(opt, default=nil)
  return @config.pluginconf[opt] if @config.pluginconf.include?(opt)
  return default unless default.nil?

  raise("No plugin.#{opt} configuration option given")
end

#make_target(agent, type, collective, reply_to = nil, node = nil) ⇒ Object



386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
# File 'lib/mcollective/connector/rabbitmq.rb', line 386

def make_target(agent, type, collective, reply_to=nil, node=nil)
  raise("Unknown target type #{type}") unless [:directed, :broadcast, :reply, :request, :direct_request].include?(type)
  raise("Unknown collective '#{collective}' known collectives are '#{@config.collectives.join ', '}'") unless @config.collectives.include?(collective)

  agents_multiplex = get_bool_option("rabbitmq.agents_multiplex", "false")
  target = {:name => "", :headers => {}, :id => nil}

  if reply_to
    reply_path = reply_to
  elsif get_bool_option("rabbitmq.use_reply_exchange", false)
    reply_path = "/exchange/mcollective_reply/%s_%s_%s" % [ @config.identity, $$, Client.request_sequence ]
  else
    reply_path = "/temp-queue/mcollective_reply_%s" % agent
  end
  case type
    when :reply # receiving replies on a temp queue
      target[:name] = reply_path
      target[:id] = "mcollective_%s_replies" % agent

    when :broadcast, :request # publishing a request to all nodes with an agent
      if agents_multiplex
        target[:name] = "/exchange/%s_broadcast" % collective
        target[:id] = "%s_broadcast" % collective
      else
        target[:name] = "/exchange/%s_broadcast/%s" % [collective, agent]
        target[:id] = "%s_broadcast_%s" % [collective, agent]
      end
      if reply_to
        target[:headers]["reply-to"] = reply_to
      else
        target[:headers]["reply-to"] = reply_path
      end

    when :direct_request # a request to a specific node
      raise "Directed requests need to have a node identity" unless node

      target[:name] = "/exchange/%s_directed/%s" % [ collective, node]
      target[:headers]["reply-to"] = reply_path

    when :directed # subscribing to directed messages
      target[:name] = "/exchange/%s_directed/%s" % [ collective, @config.identity ]
      target[:id] = "%s_%s_directed_to_identity" % [ collective, @config.identity ]
  end

  target
end

#publish(msg) ⇒ Object

Sends a message to the RabbitMQ connection



345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
# File 'lib/mcollective/connector/rabbitmq.rb', line 345

def publish(msg)
  msg.base64_encode! if @base64

  if msg.type == :direct_request
    msg.discovered_hosts.each do |node|
      target = target_for(msg, node)

      Log.debug("Sending a direct message to RabbitMQ target '#{target[:name]}' with headers '#{target[:headers].inspect}'")

      @connection.publish(target[:name], msg.payload, target[:headers])
    end
  else
    target = target_for(msg)

    Log.debug("Sending a broadcast message to RabbitMQ target '#{target[:name]}' with headers '#{target[:headers].inspect}'")

    @connection.publish(target[:name], msg.payload, target[:headers])
  end
end

#receiveObject

Receives a message from the RabbitMQ connection



313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
# File 'lib/mcollective/connector/rabbitmq.rb', line 313

def receive
  Log.debug("Waiting for a message from RabbitMQ")

  # When the Stomp library > 1.2.0 is mid reconnecting due to its reliable connection
  # handling it sets the connection to closed.  If we happen to be receiving at just
  # that time we will get an exception warning about the closed connection so handling
  # that here with a sleep and a retry.
  begin
    msg = @connection.receive
  rescue ::Stomp::Error::NoCurrentConnection
    sleep 1
    retry
  end

  # In older stomp gems an attempt to receive after failed authentication can return nil
  if msg.nil?
    raise MessageNotReceived.new(exponential_back_off), "No message received from RabbitMQ."
  end

  raise "Received a processing error from RabbitMQ: '%s'" % msg.body.chomp if msg.body =~ /Processing error/

  # We expect all messages we get to be of STOMP frame type MESSAGE, raise on unexpected types
  if msg.command != 'MESSAGE'
    Log.debug("Unexpected '#{msg.command}' frame.  Headers: #{msg.headers.inspect} Body: #{msg.body.inspect}")
    raise UnexpectedMessageType.new(exponential_back_off),
      "Received frame of type '#{msg.command}' expected 'MESSAGE'"
  end

  Message.new(msg.body, msg, :base64 => @base64, :headers => msg.headers)
end

#ssl_parameters(poolnum, fallback) ⇒ Object

Sets the SSL paramaters for a specific connection



244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
# File 'lib/mcollective/connector/rabbitmq.rb', line 244

def ssl_parameters(poolnum, fallback)
  params = {
    :cert_file => get_cert_file(poolnum),
    :key_file  => get_key_file(poolnum),
    :ts_files  => get_option("rabbitmq.pool.#{poolnum}.ssl.ca", false),
    :ciphers   => get_option("rabbitmq.pool.#{poolnum}.ssl.ciphers", false),
  }

  raise "cert, key and ca has to be supplied for verified SSL mode" unless params[:cert_file] && params[:key_file] && params[:ts_files]

  raise "Cannot find certificate file #{params[:cert_file]}" unless File.exist?(params[:cert_file])
  raise "Cannot find key file #{params[:key_file]}" unless File.exist?(params[:key_file])

  params[:ts_files].split(",").each do |ca|
    raise "Cannot find CA file #{ca}" unless File.exist?(ca)
  end

  begin
    ::Stomp::SSLParams.new(params)
  rescue NameError
    raise "Stomp gem >= 1.2.2 is needed"
  end

rescue Exception => e
  if fallback
    Log.warn("Failed to set full SSL verified mode, falling back to unverified: #{e.class}: #{e}")
    return true
  else
    Log.error("Failed to set full SSL verified mode: #{e.class}: #{e}")
    raise(e)
  end
end

#stomp_versionObject



235
236
237
# File 'lib/mcollective/connector/rabbitmq.rb', line 235

def stomp_version
  ::Stomp::Version::STRING
end

#stomp_version_supports_heartbeat?Boolean

Returns:

  • (Boolean)


239
240
241
# File 'lib/mcollective/connector/rabbitmq.rb', line 239

def stomp_version_supports_heartbeat?
  return Util.versioncmp(stomp_version, "1.2.10") >= 0
end

#subscribe(agent, type, collective) ⇒ Object

Subscribe to a topic or queue



434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
# File 'lib/mcollective/connector/rabbitmq.rb', line 434

def subscribe(agent, type, collective)
  if type == :reply
    # On rabbitmq if you send a message with a reply-to: header set to
    # '/temp-queue/*' it automatically creates a private queue, munges
    # the reply-to: header to point to this private queue, and
    # subscribes you to it.  As such you should never attempt to
    # SUBSCRIBE or UNSUBSCRIBE to '/temp-queue/*' directly as that'll
    # cause great pain and suffering.
    # https://www.rabbitmq.com/stomp.html#d.tqd

    # The exception to this is in 'use_reply_exchange' mode, when the
    # reply-to will be set to a queue in an explicit exchange.
    if !get_bool_option("rabbitmq.use_reply_exchange", false)
      # We aren't in 'use_reply_exchange' mode, don't subscribe.
      return
    end
  end

  source = make_target(agent, type, collective)

  unless @subscriptions.include?(source[:id])
    Log.debug("Subscribing to #{source[:name]} with headers #{source[:headers].inspect.chomp}")
    @connection.subscribe(source[:name], source[:headers], source[:id])
    @subscriptions << source[:id]
  end
rescue ::Stomp::Error::DuplicateSubscription
  Log.error("Received subscription request for #{source.inspect.chomp} but already had a matching subscription, ignoring")
end

#target_for(msg, node = nil) ⇒ Object



365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
# File 'lib/mcollective/connector/rabbitmq.rb', line 365

def target_for(msg, node=nil)
  if msg.type == :reply
    target = {:name => msg.request.headers["reply-to"], :headers => {}, :id => ""}

  elsif [:request, :direct_request].include?(msg.type)
    target = make_target(msg.agent, msg.type, msg.collective, msg.reply_to, node)

  else
    raise "Don't now how to create a target for message type #{msg.type}"

  end

  # marks messages as valid for ttl + 10 seconds, we do this here
  # rather than in make_target as this should only be set on publish
  target[:headers]["expiration"] = ((msg.ttl + 10) * 1000).to_s

  target[:headers]["mc_sender"] = Config.instance.identity

  return target
end

#unsubscribe(agent, type, collective) ⇒ Object

Subscribe to a topic or queue



464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
# File 'lib/mcollective/connector/rabbitmq.rb', line 464

def unsubscribe(agent, type, collective)
  if type == :reply
    # For a more detailed discussion of this logic, please see #subscribe
    if !get_bool_option("rabbitmq.use_reply_exchange", false)
      # We shouldn't try to unsubscribe from a '/temp-queue/*' queue.
      return
    end
  end

  source = make_target(agent, type, collective)

  Log.debug("Unsubscribing from #{source[:name]}")
  @connection.unsubscribe(source[:name], source[:headers], source[:id])
  @subscriptions.delete(source[:id])
end