Class: MCollective::Connector::Activemq

Inherits:
Base
  • Object
show all
Defined in:
lib/mcollective/connector/activemq.rb

Overview

Handles sending and receiving messages over the Stomp protocol for ActiveMQ servers specifically, we take advantages of ActiveMQ specific features and enhancements to the Stomp protocol. For best results in a clustered environment use ActiveMQ 5.5.0 at least.

This plugin takes an entirely different approach to dealing with ActiveMQ from the more generic stomp connector.

- Agents use /topic/<collective>.<agent>.agent
- Replies use temp-topics so they are private and transient.
- Point to Point messages using topics are supported by subscribing to
  /queue/<collective>.nodes with a selector "mc_identity = 'identity'

The use of temp-topics for the replies is a huge improvement over the old style. In the old way all clients got replies for all clients that were active at that time, this would mean that they would need to decrypt, validate etc in order to determine if they need to ignore the message, this was computationally expensive and on large busy networks the messages were being sent all over the show cross broker boundaries.

The new way means the messages go point2point back to only whoever requested the message, they only get their own replies and this is ap private channel that casual observers cannot just snoop into.

This plugin supports 1.1.6 and newer of the Stomp rubygem.

connector = activemq
plugin.activemq.pool.size = 2

plugin.activemq.pool.1.host = stomp1.your.net
plugin.activemq.pool.1.port = 61613
plugin.activemq.pool.1.user = you
plugin.activemq.pool.1.password = secret
plugin.activemq.pool.1.ssl = true
plugin.activemq.pool.1.ssl.cert = /path/to/your.cert
plugin.activemq.pool.1.ssl.key = /path/to/your.key
plugin.activemq.pool.1.ssl.ca = /path/to/your.ca
plugin.activemq.pool.1.ssl.fallback = true
plugin.activemq.pool.1.ssl.ciphers = TLSv1:!MD5:!LOW:!EXPORT

plugin.activemq.pool.2.host = stomp2.your.net
plugin.activemq.pool.2.port = 61613
plugin.activemq.pool.2.user = you
plugin.activemq.pool.2.password = secret
plugin.activemq.pool.2.ssl = false

Using this method you can supply just STOMP_USER and STOMP_PASSWORD. The port will default to 61613 if not specified.

The ssl options are only usable in version of the Stomp gem newer than 1.2.2 where these will imply full SSL validation will be done and you’ll only be able to connect to a ActiveMQ server that has a cert signed by the same CA. If you only set ssl = true and do not supply the cert, key and ca properties or if you have an older gem it will fall back to unverified mode only if ssl.fallback is true

In addition you can set the following options for the rubygem:

plugin.activemq.initial_reconnect_delay = 0.01
plugin.activemq.max_reconnect_delay = 30.0
plugin.activemq.use_exponential_back_off = true
plugin.activemq.back_off_multiplier = 2
plugin.activemq.max_reconnect_attempts = 0
plugin.activemq.randomize = false
plugin.activemq.timeout = -1

You can set the initial connetion timeout - this is when your stomp server is simply unreachable - after which it would failover to the next in the pool:

plugin.activemq.connect_timeout = 30

ActiveMQ JMS message priorities can be set:

plugin.activemq.priority = 4

This plugin supports Stomp protocol 1.1 when combined with the stomp gem version 1.2.10 or newer. To enable network heartbeats which will help keep the connection alive over NAT connections and aggresive session tracking firewalls you can set:

plugin.activemq.heartbeat_interval = 30

which will cause a heartbeat to be sent on 30 second intervals and one to be expected from the broker every 30 seconds. The shortest supported period is 30 seconds, if you set it lower it will get forced to 30 seconds.

After 2 failures to receive a heartbeat the connection will be reset via the normal failover mechanism.

By default if heartbeat_interval is set it will request Stomp 1.1 but support fallback to 1.0, but you can enable strict Stomp 1.1 only operation

plugin.activemq.stomp_1_0_fallback = 0

Defined Under Namespace

Classes: DummyError, EventLogger

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Base

inherited

Constructor Details

#initializeActivemq

Returns a new instance of Activemq.



200
201
202
203
204
205
206
207
208
209
210
211
212
# File 'lib/mcollective/connector/activemq.rb', line 200

def initialize
  @config = Config.instance
  @subscriptions = []
  @msgpriority = 0
  @base64 = false
  @use_exponential_back_off = get_bool_option("activemq.use_exponential_back_off", "true")
  @initial_reconnect_delay = Float(get_option("activemq.initial_reconnect_delay", 0.01))
  @back_off_multiplier = Integer(get_option("activemq.back_off_multiplier", 2))
  @max_reconnect_delay = Float(get_option("activemq.max_reconnect_delay", 30.0))
  @reconnect_delay = @initial_reconnect_delay

  Log.info("ActiveMQ connector initialized.  Using stomp-gem #{stomp_version}")
end

Instance Attribute Details

#connectionObject (readonly)

Returns the value of attribute connection.



97
98
99
# File 'lib/mcollective/connector/activemq.rb', line 97

def connection
  @connection
end

Instance Method Details

#connect(connector = ::Stomp::Connection) ⇒ Object

Connects to the ActiveMQ middleware



215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
# File 'lib/mcollective/connector/activemq.rb', line 215

def connect(connector = ::Stomp::Connection)
  if @connection
    Log.debug("Already connection, not re-initializing connection")
    return
  end

  begin
    @base64 = get_bool_option("activemq.base64", "false")
    @msgpriority = get_option("activemq.priority", 0).to_i

    pools = Integer(get_option("activemq.pool.size"))
    hosts = []
    middleware_user = ''
    middleware_password = ''
    prompt_for_username = get_bool_option("activemq.prompt_user", "false")
    prompt_for_password = get_bool_option("activemq.prompt_password", "false")
    
    if prompt_for_username
      Log.debug("No previous user exists and activemq.prompt-user is set to true")
      print "Please enter user to connect to middleware: "
      middleware_user = STDIN.gets.chomp
    end

    if prompt_for_password
      Log.debug("No previous password exists and activemq.prompt-password is set to true")
      middleware_password = MCollective::Util.get_hidden_input("Please enter password: ")
      print "\n"
    end

    1.upto(pools) do |poolnum|
      host = {}

      host[:host] = get_option("activemq.pool.#{poolnum}.host")
      host[:port] = get_option("activemq.pool.#{poolnum}.port", 61613).to_i
      host[:ssl] = get_bool_option("activemq.pool.#{poolnum}.ssl", "false")
      
      # read user from config file
      host[:login] = get_env_or_option("STOMP_USER", "activemq.pool.#{poolnum}.user", middleware_user)
      if prompt_for_username and host[:login] != middleware_user
          Log.info("Using #{host[:login]} from config file to connect to #{host[:host]}. "+
                  "plugin.activemq.prompt_user should be set to false to remove the prompt.")
      end
      
      # read user from config file
      host[:passcode] = get_env_or_option("STOMP_PASSWORD", "activemq.pool.#{poolnum}.password", middleware_password)
      if prompt_for_password and host[:passcode] != middleware_password
          Log.info("Using password from config file to connect to #{host[:host]}. "+
                  "plugin.activemq.prompt_password should be set to false to remove the prompt.")
      end

      # if ssl is enabled set :ssl to the hash of parameters
      if host[:ssl]
        host[:ssl] = ssl_parameters(poolnum, get_bool_option("activemq.pool.#{poolnum}.ssl.fallback", "false"))
      end

      Log.debug("Adding #{host[:host]}:#{host[:port]} to the connection pool")
      hosts << host
    end

    raise "No hosts found for the ActiveMQ connection pool" if hosts.size == 0

    connection = {:hosts => hosts}

    # Various STOMP gem options, defaults here matches defaults for 1.1.6 the meaning of
    # these can be guessed, the documentation isn't clear
    connection[:use_exponential_back_off] = @use_exponential_back_off
    connection[:initial_reconnect_delay] = @initial_reconnect_delay
    connection[:back_off_multiplier] = @back_off_multiplier
    connection[:max_reconnect_delay] = @max_reconnect_delay
    connection[:max_reconnect_attempts] = Integer(get_option("activemq.max_reconnect_attempts", 0))
    connection[:randomize] = get_bool_option("activemq.randomize", "false")
    connection[:backup] = get_bool_option("activemq.backup", "false")
    connection[:timeout] = Integer(get_option("activemq.timeout", -1))
    connection[:connect_timeout] = Integer(get_option("activemq.connect_timeout", 30))
    connection[:reliable] = true
    connection[:connect_headers] = connection_headers
    connection[:max_hbrlck_fails] = Integer(get_option("activemq.max_hbrlck_fails", 0))
    connection[:max_hbread_fails] = Integer(get_option("activemq.max_hbread_fails", 2))

    connection[:logger] = EventLogger.new

    @connection = connector.new(connection)

  rescue ClientTimeoutError => e
    raise e
  rescue Exception => e
    raise("Could not connect to ActiveMQ Server: #{e}")
  end
end

#connection_headersObject



313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
# File 'lib/mcollective/connector/activemq.rb', line 313

def connection_headers
  headers = {:"accept-version" => "1.0"}

  heartbeat_interval = Integer(get_option("activemq.heartbeat_interval", 0))
  stomp_1_0_fallback = get_bool_option("activemq.stomp_1_0_fallback", true)

  headers[:host] = get_option("activemq.vhost", "mcollective")

  if heartbeat_interval > 0
    unless stomp_version_supports_heartbeat?
      raise("Setting STOMP 1.1 properties like heartbeat intervals require at least version 1.2.10 of the STOMP gem")
    end

    if heartbeat_interval < 30
      Log.warn("Connection heartbeat is set to %d, forcing to minimum value of 30s")
      heartbeat_interval = 30
    end

    heartbeat_interval = heartbeat_interval * 1000
    headers[:"heart-beat"] = "%d,%d" % [heartbeat_interval + 500, heartbeat_interval - 500]

    if stomp_1_0_fallback
      headers[:"accept-version"] = "1.1,1.0"
    else
      headers[:"accept-version"] = "1.1"
    end
  else
    if stomp_version_supports_heartbeat?
      Log.info("Connecting without STOMP 1.1 heartbeats, if you are using ActiveMQ 5.8 or newer consider setting plugin.activemq.heartbeat_interval")
    end
  end

  headers
end

#disconnectObject

Disconnects from the ActiveMQ connection



506
507
508
509
510
# File 'lib/mcollective/connector/activemq.rb', line 506

def disconnect
  Log.debug("Disconnecting from ActiveMQ")
  @connection.disconnect
  @connection = nil
end

#exponential_back_offObject

Calculate the exponential backoff needed



399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
# File 'lib/mcollective/connector/activemq.rb', line 399

def exponential_back_off
  if !@use_exponential_back_off
    return nil
  end

  backoff = @reconnect_delay

  # calculate next delay
  @reconnect_delay = @reconnect_delay * @back_off_multiplier

  # cap at max reconnect delay
  if @reconnect_delay > @max_reconnect_delay
    @reconnect_delay = @max_reconnect_delay
  end

  return backoff
end

#get_bool_option(val, default) ⇒ Object

looks up a boolean value in the config



602
603
604
# File 'lib/mcollective/connector/activemq.rb', line 602

def get_bool_option(val, default)
  Util.str_to_bool(@config.pluginconf.fetch(val, default))
end

#get_cert_file(poolnum) ⇒ Object

Returns the name of the certficate file used by ActiveMQ Will first check if an environment variable MCOLLECTIVE_ACTIVEMQ_POOLX_SSL_CERT exists, where X is the ActiveMQ pool number. If the environment variable doesn’t exist, it will try and load the value from the config.



394
395
396
# File 'lib/mcollective/connector/activemq.rb', line 394

def get_cert_file(poolnum)
  ENV["MCOLLECTIVE_ACTIVEMQ_POOL%s_SSL_CERT" % poolnum] || get_option("activemq.pool.#{poolnum}.ssl.cert", false)
end

#get_env_or_option(env, opt, default = nil) ⇒ Object

looks in the environment first then in the config file for a specific option, accepts an optional default.

raises an exception when it cant find a value anywhere



583
584
585
586
587
588
589
# File 'lib/mcollective/connector/activemq.rb', line 583

def get_env_or_option(env, opt, default=nil)
  return ENV[env] if ENV.include?(env)
  return @config.pluginconf[opt] if @config.pluginconf.include?(opt)
  return default if default

  raise("No #{env} environment or plugin.#{opt} configuration option given")
end

#get_key_file(poolnum) ⇒ Object

Returns the name of the private key file used by ActiveMQ Will first check if an environment variable MCOLLECTIVE_ACTIVEMQ_POOLX_SSL_KEY exists, where X is the ActiveMQ pool number. If the environment variable doesn’t exist, it will try and load the value from the config.



386
387
388
# File 'lib/mcollective/connector/activemq.rb', line 386

def get_key_file(poolnum)
  ENV["MCOLLECTIVE_ACTIVEMQ_POOL%s_SSL_KEY" % poolnum] || get_option("activemq.pool.#{poolnum}.ssl.key", false)
end

#get_option(opt, default = nil) ⇒ Object

looks for a config option, accepts an optional default

raises an exception when it cant find a value anywhere



594
595
596
597
598
599
# File 'lib/mcollective/connector/activemq.rb', line 594

def get_option(opt, default=nil)
  return @config.pluginconf[opt] if @config.pluginconf.include?(opt)
  return default unless default.nil?

  raise("No plugin.#{opt} configuration option given")
end

#headers_for(msg, identity = nil) ⇒ Object



512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
# File 'lib/mcollective/connector/activemq.rb', line 512

def headers_for(msg, identity=nil)
  headers = {}

  headers = {"priority" => @msgpriority} if @msgpriority > 0

  headers["timestamp"] = (Time.now.utc.to_i * 1000).to_s

  # set the expires header based on the TTL, we build a small additional
  # timeout of 10 seconds in here to allow for network latency etc
  headers["expires"] = ((Time.now.utc.to_i + msg.ttl + 10) * 1000).to_s

  if [:request, :direct_request].include?(msg.type)
    target = make_target(msg.agent, :reply, msg.collective)

    if msg.reply_to
      headers["reply-to"] = msg.reply_to
    else
      headers["reply-to"] = target[:name]
    end

    headers["mc_identity"] = identity if msg.type == :direct_request
  end

  headers["mc_sender"] = Config.instance.identity

  return headers
end

#make_target(agent, type, collective) ⇒ Object



540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
# File 'lib/mcollective/connector/activemq.rb', line 540

def make_target(agent, type, collective)
  raise("Unknown target type #{type}") unless [:directed, :broadcast, :reply, :request, :direct_request].include?(type)
  raise("Unknown collective '#{collective}' known collectives are '#{@config.collectives.join ', '}'") unless @config.collectives.include?(collective)

  agents_multiplex = get_bool_option("activemq.agents_multiplex", "false")
  target = {:name => nil, :headers => {}}

  case type
    when :reply
      target[:name] = ["/queue/" + collective, :reply, "#{Config.instance.identity}_#{$$}", Client.request_sequence].join(".")

    when :broadcast
      if agents_multiplex
        target[:name] = ["/topic/" + collective, :agents].join(".")
      else
        target[:name] = ["/topic/" + collective, agent, :agent].join(".")
      end

    when :request
      if agents_multiplex
        target[:name] = ["/topic/" + collective, :agents].join(".")
      else
        target[:name] = ["/topic/" + collective, agent, :agent].join(".")
      end

    when :direct_request
      target[:name] = ["/queue/" + collective, :nodes].join(".")

    when :directed
      target[:name] = ["/queue/" + collective, :nodes].join(".")
      target[:headers]["selector"] = "mc_identity = '#{@config.identity}'"
      target[:id] = "%s_directed_to_identity" % collective
  end

  target[:id] = target[:name] unless target[:id]

  target
end

#publish(msg) ⇒ Object

Sends a message to the ActiveMQ connection



449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
# File 'lib/mcollective/connector/activemq.rb', line 449

def publish(msg)
  msg.base64_encode! if @base64

  target = target_for(msg)

  if msg.type == :direct_request
    msg.discovered_hosts.each do |node|
      target[:headers] = headers_for(msg, node)

      Log.debug("Sending a direct message to ActiveMQ target '#{target[:name]}' with headers '#{target[:headers].inspect}'")

      @connection.publish(target[:name], msg.payload, target[:headers])
    end
  else
    target[:headers].merge!(headers_for(msg))

    Log.debug("Sending a broadcast message to ActiveMQ target '#{target[:name]}' with headers '#{target[:headers].inspect}'")

    @connection.publish(target[:name], msg.payload, target[:headers])
  end
end

#receiveObject

Receives a message from the ActiveMQ connection



418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
# File 'lib/mcollective/connector/activemq.rb', line 418

def receive
  Log.debug("Waiting for a message from ActiveMQ")

  # When the Stomp library > 1.2.0 is mid reconnecting due to its reliable connection
  # handling it sets the connection to closed.  If we happen to be receiving at just
  # that time we will get an exception warning about the closed connection so handling
  # that here with a sleep and a retry.
  begin
    msg = @connection.receive
  rescue ::Stomp::Error::NoCurrentConnection
    sleep 1
    retry
  end

  # In older stomp gems an attempt to receive after failed authentication can return nil
  if msg.nil?
    raise MessageNotReceived.new(exponential_back_off), "No message received from ActiveMQ."

  end

  # We expect all messages we get to be of STOMP frame type MESSAGE, raise on unexpected types
  if msg.command != 'MESSAGE'
    Log.warn("Unexpected '#{msg.command}' frame.  Headers: #{msg.headers.inspect} Body: #{msg.body.inspect}")
    raise UnexpectedMessageType.new(exponential_back_off),
      "Received frame of type '#{msg.command}' expected 'MESSAGE'"
  end

  Message.new(msg.body, msg, :base64 => @base64, :headers => msg.headers)
end

#ssl_parameters(poolnum, fallback) ⇒ Object

Sets the SSL paramaters for a specific connection



349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
# File 'lib/mcollective/connector/activemq.rb', line 349

def ssl_parameters(poolnum, fallback)
  params = {
    :cert_file => get_cert_file(poolnum),
    :key_file  => get_key_file(poolnum),
    :ts_files  => get_option("activemq.pool.#{poolnum}.ssl.ca", false),
    :ciphers   => get_option("activemq.pool.#{poolnum}.ssl.ciphers", false),
  }

  raise "cert, key and ca has to be supplied for verified SSL mode" unless params[:cert_file] && params[:key_file] && params[:ts_files]

  raise "Cannot find certificate file #{params[:cert_file]}" unless File.exist?(params[:cert_file])
  raise "Cannot find key file #{params[:key_file]}" unless File.exist?(params[:key_file])

  params[:ts_files].split(",").each do |ca|
    raise "Cannot find CA file #{ca}" unless File.exist?(ca)
  end

  begin
    ::Stomp::SSLParams.new(params)
  rescue NameError
    raise "Stomp gem >= 1.2.2 is needed"
  end

rescue Exception => e
  if fallback
    Log.warn("Failed to set full SSL verified mode, falling back to unverified: #{e.class}: #{e}")
    return true
  else
    Log.error("Failed to set full SSL verified mode: #{e.class}: #{e}")
    raise(e)
  end
end

#stomp_versionObject



305
306
307
# File 'lib/mcollective/connector/activemq.rb', line 305

def stomp_version
  ::Stomp::Version::STRING
end

#stomp_version_supports_heartbeat?Boolean

Returns:

  • (Boolean)


309
310
311
# File 'lib/mcollective/connector/activemq.rb', line 309

def stomp_version_supports_heartbeat?
  return Util.versioncmp(stomp_version, "1.2.10") >= 0
end

#subscribe(agent, type, collective) ⇒ Object

Subscribe to a topic or queue



472
473
474
475
476
477
478
479
480
481
482
# File 'lib/mcollective/connector/activemq.rb', line 472

def subscribe(agent, type, collective)
  source = make_target(agent, type, collective)

  unless @subscriptions.include?(source[:id])
    Log.debug("Subscribing to #{source[:name]} with headers #{source[:headers].inspect.chomp}")
    @connection.subscribe(source[:name], source[:headers], source[:id])
    @subscriptions << source[:id]
  end
rescue ::Stomp::Error::DuplicateSubscription
  Log.error("Received subscription request for #{source.inspect.chomp} but already had a matching subscription, ignoring")
end

#target_for(msg) ⇒ Object



493
494
495
496
497
498
499
500
501
502
503
# File 'lib/mcollective/connector/activemq.rb', line 493

def target_for(msg)
  if msg.type == :reply
    target = {:name => msg.request.headers["reply-to"], :headers => {}}
  elsif [:request, :direct_request].include?(msg.type)
    target = make_target(msg.agent, msg.type, msg.collective)
  else
    raise "Don't now how to create a target for message type #{msg.type}"
  end

  return target
end

#unsubscribe(agent, type, collective) ⇒ Object

UnSubscribe to a topic or queue



485
486
487
488
489
490
491
# File 'lib/mcollective/connector/activemq.rb', line 485

def unsubscribe(agent, type, collective)
  source = make_target(agent, type, collective)

  Log.debug("Unsubscribing from #{source[:name]}")
  @connection.unsubscribe(source[:name], source[:headers], source[:id])
  @subscriptions.delete(source[:id])
end