Class: MCollective::Connector::Activemq
- Defined in:
- lib/mcollective/connector/activemq.rb
Overview
Handles sending and receiving messages over the Stomp protocol for ActiveMQ servers specifically, we take advantages of ActiveMQ specific features and enhancements to the Stomp protocol. For best results in a clustered environment use ActiveMQ 5.5.0 at least.
This plugin takes an entirely different approach to dealing with ActiveMQ from the more generic stomp connector.
- Agents use /topic/<collective>.<agent>.agent
- Replies use temp-topics so they are private and transient.
- Point to Point messages using topics are supported by subscribing to
/queue/<collective>.nodes with a selector "mc_identity = 'identity'
The use of temp-topics for the replies is a huge improvement over the old style. In the old way all clients got replies for all clients that were active at that time, this would mean that they would need to decrypt, validate etc in order to determine if they need to ignore the message, this was computationally expensive and on large busy networks the messages were being sent all over the show cross broker boundaries.
The new way means the messages go point2point back to only whoever requested the message, they only get their own replies and this is ap private channel that casual observers cannot just snoop into.
This plugin supports 1.1.6 and newer of the Stomp rubygem.
connector = activemq
plugin.activemq.pool.size = 2
plugin.activemq.pool.1.host = stomp1.your.net
plugin.activemq.pool.1.port = 61613
plugin.activemq.pool.1.user = you
plugin.activemq.pool.1.password = secret
plugin.activemq.pool.1.ssl = true
plugin.activemq.pool.1.ssl.cert = /path/to/your.cert
plugin.activemq.pool.1.ssl.key = /path/to/your.key
plugin.activemq.pool.1.ssl.ca = /path/to/your.ca
plugin.activemq.pool.1.ssl.fallback = true
plugin.activemq.pool.1.ssl.ciphers = TLSv1:!MD5:!LOW:!EXPORT
plugin.activemq.pool.2.host = stomp2.your.net
plugin.activemq.pool.2.port = 61613
plugin.activemq.pool.2.user = you
plugin.activemq.pool.2.password = secret
plugin.activemq.pool.2.ssl = false
Using this method you can supply just STOMP_USER and STOMP_PASSWORD. The port will default to 61613 if not specified.
The ssl options are only usable in version of the Stomp gem newer than 1.2.2 where these will imply full SSL validation will be done and you’ll only be able to connect to a ActiveMQ server that has a cert signed by the same CA. If you only set ssl = true and do not supply the cert, key and ca properties or if you have an older gem it will fall back to unverified mode only if ssl.fallback is true
In addition you can set the following options for the rubygem:
plugin.activemq.initial_reconnect_delay = 0.01
plugin.activemq.max_reconnect_delay = 30.0
plugin.activemq.use_exponential_back_off = true
plugin.activemq.back_off_multiplier = 2
plugin.activemq.max_reconnect_attempts = 0
plugin.activemq.randomize = false
plugin.activemq.timeout = -1
You can set the initial connetion timeout - this is when your stomp server is simply unreachable - after which it would failover to the next in the pool:
plugin.activemq.connect_timeout = 30
ActiveMQ JMS message priorities can be set:
plugin.activemq.priority = 4
This plugin supports Stomp protocol 1.1 when combined with the stomp gem version 1.2.10 or newer. To enable network heartbeats which will help keep the connection alive over NAT connections and aggresive session tracking firewalls you can set:
plugin.activemq.heartbeat_interval = 30
which will cause a heartbeat to be sent on 30 second intervals and one to be expected from the broker every 30 seconds. The shortest supported period is 30 seconds, if you set it lower it will get forced to 30 seconds.
After 2 failures to receive a heartbeat the connection will be reset via the normal failover mechanism.
By default if heartbeat_interval is set it will request Stomp 1.1 but support fallback to 1.0, but you can enable strict Stomp 1.1 only operation
plugin.activemq.stomp_1_0_fallback = 0
Defined Under Namespace
Classes: DummyError, EventLogger
Instance Attribute Summary collapse
-
#connection ⇒ Object
readonly
Returns the value of attribute connection.
Instance Method Summary collapse
-
#connect(connector = ::Stomp::Connection) ⇒ Object
Connects to the ActiveMQ middleware.
- #connection_headers ⇒ Object
-
#disconnect ⇒ Object
Disconnects from the ActiveMQ connection.
-
#exponential_back_off ⇒ Object
Calculate the exponential backoff needed.
-
#get_bool_option(val, default) ⇒ Object
looks up a boolean value in the config.
-
#get_cert_file(poolnum) ⇒ Object
Returns the name of the certficate file used by ActiveMQ Will first check if an environment variable MCOLLECTIVE_ACTIVEMQ_POOLX_SSL_CERT exists, where X is the ActiveMQ pool number.
-
#get_env_or_option(env, opt, default = nil) ⇒ Object
looks in the environment first then in the config file for a specific option, accepts an optional default.
-
#get_key_file(poolnum) ⇒ Object
Returns the name of the private key file used by ActiveMQ Will first check if an environment variable MCOLLECTIVE_ACTIVEMQ_POOLX_SSL_KEY exists, where X is the ActiveMQ pool number.
-
#get_option(opt, default = nil) ⇒ Object
looks for a config option, accepts an optional default.
- #headers_for(msg, identity = nil) ⇒ Object
-
#initialize ⇒ Activemq
constructor
A new instance of Activemq.
- #make_target(agent, type, collective) ⇒ Object
-
#publish(msg) ⇒ Object
Sends a message to the ActiveMQ connection.
-
#receive ⇒ Object
Receives a message from the ActiveMQ connection.
-
#ssl_parameters(poolnum, fallback) ⇒ Object
Sets the SSL paramaters for a specific connection.
- #stomp_version ⇒ Object
- #stomp_version_supports_heartbeat? ⇒ Boolean
-
#subscribe(agent, type, collective) ⇒ Object
Subscribe to a topic or queue.
- #target_for(msg) ⇒ Object
-
#unsubscribe(agent, type, collective) ⇒ Object
UnSubscribe to a topic or queue.
Methods inherited from Base
Constructor Details
#initialize ⇒ Activemq
Returns a new instance of Activemq.
200 201 202 203 204 205 206 207 208 209 210 211 212 |
# File 'lib/mcollective/connector/activemq.rb', line 200 def initialize @config = Config.instance @subscriptions = [] @msgpriority = 0 @base64 = false @use_exponential_back_off = get_bool_option("activemq.use_exponential_back_off", "true") @initial_reconnect_delay = Float(get_option("activemq.initial_reconnect_delay", 0.01)) @back_off_multiplier = Integer(get_option("activemq.back_off_multiplier", 2)) @max_reconnect_delay = Float(get_option("activemq.max_reconnect_delay", 30.0)) @reconnect_delay = @initial_reconnect_delay Log.info("ActiveMQ connector initialized. Using stomp-gem #{stomp_version}") end |
Instance Attribute Details
#connection ⇒ Object (readonly)
Returns the value of attribute connection.
97 98 99 |
# File 'lib/mcollective/connector/activemq.rb', line 97 def connection @connection end |
Instance Method Details
#connect(connector = ::Stomp::Connection) ⇒ Object
Connects to the ActiveMQ middleware
215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 |
# File 'lib/mcollective/connector/activemq.rb', line 215 def connect(connector = ::Stomp::Connection) if @connection Log.debug("Already connection, not re-initializing connection") return end begin @base64 = get_bool_option("activemq.base64", "false") @msgpriority = get_option("activemq.priority", 0).to_i pools = Integer(get_option("activemq.pool.size")) hosts = [] 1.upto(pools) do |poolnum| host = {} host[:host] = get_option("activemq.pool.#{poolnum}.host") host[:port] = get_option("activemq.pool.#{poolnum}.port", 61613).to_i host[:login] = get_env_or_option("STOMP_USER", "activemq.pool.#{poolnum}.user", '') host[:passcode] = get_env_or_option("STOMP_PASSWORD", "activemq.pool.#{poolnum}.password", '') host[:ssl] = get_bool_option("activemq.pool.#{poolnum}.ssl", "false") # if ssl is enabled set :ssl to the hash of parameters if host[:ssl] host[:ssl] = ssl_parameters(poolnum, get_bool_option("activemq.pool.#{poolnum}.ssl.fallback", "false")) end Log.debug("Adding #{host[:host]}:#{host[:port]} to the connection pool") hosts << host end raise "No hosts found for the ActiveMQ connection pool" if hosts.size == 0 connection = {:hosts => hosts} # Various STOMP gem options, defaults here matches defaults for 1.1.6 the meaning of # these can be guessed, the documentation isn't clear connection[:use_exponential_back_off] = @use_exponential_back_off connection[:initial_reconnect_delay] = @initial_reconnect_delay connection[:back_off_multiplier] = @back_off_multiplier connection[:max_reconnect_delay] = @max_reconnect_delay connection[:max_reconnect_attempts] = Integer(get_option("activemq.max_reconnect_attempts", 0)) connection[:randomize] = get_bool_option("activemq.randomize", "false") connection[:backup] = get_bool_option("activemq.backup", "false") connection[:timeout] = Integer(get_option("activemq.timeout", -1)) connection[:connect_timeout] = Integer(get_option("activemq.connect_timeout", 30)) connection[:reliable] = true connection[:connect_headers] = connection_headers connection[:max_hbrlck_fails] = Integer(get_option("activemq.max_hbrlck_fails", 0)) connection[:max_hbread_fails] = Integer(get_option("activemq.max_hbread_fails", 2)) connection[:logger] = EventLogger.new @connection = connector.new(connection) rescue ClientTimeoutError => e raise e rescue Exception => e raise("Could not connect to ActiveMQ Server: #{e}") end end |
#connection_headers ⇒ Object
285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 |
# File 'lib/mcollective/connector/activemq.rb', line 285 def connection_headers headers = {:"accept-version" => "1.0"} heartbeat_interval = Integer(get_option("activemq.heartbeat_interval", 0)) stomp_1_0_fallback = get_bool_option("activemq.stomp_1_0_fallback", true) headers[:host] = get_option("activemq.vhost", "mcollective") if heartbeat_interval > 0 unless stomp_version_supports_heartbeat? raise("Setting STOMP 1.1 properties like heartbeat intervals require at least version 1.2.10 of the STOMP gem") end if heartbeat_interval < 30 Log.warn("Connection heartbeat is set to %d, forcing to minimum value of 30s") heartbeat_interval = 30 end heartbeat_interval = heartbeat_interval * 1000 headers[:"heart-beat"] = "%d,%d" % [heartbeat_interval + 500, heartbeat_interval - 500] if stomp_1_0_fallback headers[:"accept-version"] = "1.1,1.0" else headers[:"accept-version"] = "1.1" end else if stomp_version_supports_heartbeat? Log.info("Connecting without STOMP 1.1 heartbeats, if you are using ActiveMQ 5.8 or newer consider setting plugin.activemq.heartbeat_interval") end end headers end |
#disconnect ⇒ Object
Disconnects from the ActiveMQ connection
478 479 480 481 482 |
# File 'lib/mcollective/connector/activemq.rb', line 478 def disconnect Log.debug("Disconnecting from ActiveMQ") @connection.disconnect @connection = nil end |
#exponential_back_off ⇒ Object
Calculate the exponential backoff needed
371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 |
# File 'lib/mcollective/connector/activemq.rb', line 371 def exponential_back_off if !@use_exponential_back_off return nil end backoff = @reconnect_delay # calculate next delay @reconnect_delay = @reconnect_delay * @back_off_multiplier # cap at max reconnect delay if @reconnect_delay > @max_reconnect_delay @reconnect_delay = @max_reconnect_delay end return backoff end |
#get_bool_option(val, default) ⇒ Object
looks up a boolean value in the config
565 566 567 |
# File 'lib/mcollective/connector/activemq.rb', line 565 def get_bool_option(val, default) Util.str_to_bool(@config.pluginconf.fetch(val, default)) end |
#get_cert_file(poolnum) ⇒ Object
Returns the name of the certficate file used by ActiveMQ Will first check if an environment variable MCOLLECTIVE_ACTIVEMQ_POOLX_SSL_CERT exists, where X is the ActiveMQ pool number. If the environment variable doesn’t exist, it will try and load the value from the config.
366 367 368 |
# File 'lib/mcollective/connector/activemq.rb', line 366 def get_cert_file(poolnum) ENV["MCOLLECTIVE_ACTIVEMQ_POOL%s_SSL_CERT" % poolnum] || get_option("activemq.pool.#{poolnum}.ssl.cert", false) end |
#get_env_or_option(env, opt, default = nil) ⇒ Object
looks in the environment first then in the config file for a specific option, accepts an optional default.
raises an exception when it cant find a value anywhere
546 547 548 549 550 551 552 |
# File 'lib/mcollective/connector/activemq.rb', line 546 def get_env_or_option(env, opt, default=nil) return ENV[env] if ENV.include?(env) return @config.pluginconf[opt] if @config.pluginconf.include?(opt) return default if default raise("No #{env} environment or plugin.#{opt} configuration option given") end |
#get_key_file(poolnum) ⇒ Object
Returns the name of the private key file used by ActiveMQ Will first check if an environment variable MCOLLECTIVE_ACTIVEMQ_POOLX_SSL_KEY exists, where X is the ActiveMQ pool number. If the environment variable doesn’t exist, it will try and load the value from the config.
358 359 360 |
# File 'lib/mcollective/connector/activemq.rb', line 358 def get_key_file(poolnum) ENV["MCOLLECTIVE_ACTIVEMQ_POOL%s_SSL_KEY" % poolnum] || get_option("activemq.pool.#{poolnum}.ssl.key", false) end |
#get_option(opt, default = nil) ⇒ Object
looks for a config option, accepts an optional default
raises an exception when it cant find a value anywhere
557 558 559 560 561 562 |
# File 'lib/mcollective/connector/activemq.rb', line 557 def get_option(opt, default=nil) return @config.pluginconf[opt] if @config.pluginconf.include?(opt) return default unless default.nil? raise("No plugin.#{opt} configuration option given") end |
#headers_for(msg, identity = nil) ⇒ Object
484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 |
# File 'lib/mcollective/connector/activemq.rb', line 484 def headers_for(msg, identity=nil) headers = {} headers = {"priority" => @msgpriority} if @msgpriority > 0 headers["timestamp"] = (Time.now.utc.to_i * 1000).to_s # set the expires header based on the TTL, we build a small additional # timeout of 10 seconds in here to allow for network latency etc headers["expires"] = ((Time.now.utc.to_i + msg.ttl + 10) * 1000).to_s if [:request, :direct_request].include?(msg.type) target = make_target(msg.agent, :reply, msg.collective) if msg.reply_to headers["reply-to"] = msg.reply_to else headers["reply-to"] = target[:name] end headers["mc_identity"] = identity if msg.type == :direct_request end headers["mc_sender"] = Config.instance.identity return headers end |
#make_target(agent, type, collective) ⇒ Object
512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 |
# File 'lib/mcollective/connector/activemq.rb', line 512 def make_target(agent, type, collective) raise("Unknown target type #{type}") unless [:directed, :broadcast, :reply, :request, :direct_request].include?(type) raise("Unknown collective '#{collective}' known collectives are '#{@config.collectives.join ', '}'") unless @config.collectives.include?(collective) target = {:name => nil, :headers => {}} case type when :reply target[:name] = ["/queue/" + collective, :reply, "#{Config.instance.identity}_#{$$}", Client.request_sequence].join(".") when :broadcast target[:name] = ["/topic/" + collective, agent, :agent].join(".") when :request target[:name] = ["/topic/" + collective, agent, :agent].join(".") when :direct_request target[:name] = ["/queue/" + collective, :nodes].join(".") when :directed target[:name] = ["/queue/" + collective, :nodes].join(".") target[:headers]["selector"] = "mc_identity = '#{@config.identity}'" target[:id] = "%s_directed_to_identity" % collective end target[:id] = target[:name] unless target[:id] target end |
#publish(msg) ⇒ Object
Sends a message to the ActiveMQ connection
421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 |
# File 'lib/mcollective/connector/activemq.rb', line 421 def publish(msg) msg.base64_encode! if @base64 target = target_for(msg) if msg.type == :direct_request msg.discovered_hosts.each do |node| target[:headers] = headers_for(msg, node) Log.debug("Sending a direct message to ActiveMQ target '#{target[:name]}' with headers '#{target[:headers].inspect}'") @connection.publish(target[:name], msg.payload, target[:headers]) end else target[:headers].merge!(headers_for(msg)) Log.debug("Sending a broadcast message to ActiveMQ target '#{target[:name]}' with headers '#{target[:headers].inspect}'") @connection.publish(target[:name], msg.payload, target[:headers]) end end |
#receive ⇒ Object
Receives a message from the ActiveMQ connection
390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 |
# File 'lib/mcollective/connector/activemq.rb', line 390 def receive Log.debug("Waiting for a message from ActiveMQ") # When the Stomp library > 1.2.0 is mid reconnecting due to its reliable connection # handling it sets the connection to closed. If we happen to be receiving at just # that time we will get an exception warning about the closed connection so handling # that here with a sleep and a retry. begin msg = @connection.receive rescue ::Stomp::Error::NoCurrentConnection sleep 1 retry end # In older stomp gems an attempt to receive after failed authentication can return nil if msg.nil? raise MessageNotReceived.new(exponential_back_off), "No message received from ActiveMQ." end # We expect all messages we get to be of STOMP frame type MESSAGE, raise on unexpected types if msg.command != 'MESSAGE' Log.debug("Unexpected '#{msg.command}' frame. Headers: #{msg.headers.inspect} Body: #{msg.body.inspect}") raise UnexpectedMessageType.new(exponential_back_off), "Received frame of type '#{msg.command}' expected 'MESSAGE'" end Message.new(msg.body, msg, :base64 => @base64, :headers => msg.headers) end |
#ssl_parameters(poolnum, fallback) ⇒ Object
Sets the SSL paramaters for a specific connection
321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 |
# File 'lib/mcollective/connector/activemq.rb', line 321 def ssl_parameters(poolnum, fallback) params = { :cert_file => get_cert_file(poolnum), :key_file => get_key_file(poolnum), :ts_files => get_option("activemq.pool.#{poolnum}.ssl.ca", false), :ciphers => get_option("activemq.pool.#{poolnum}.ssl.ciphers", false), } raise "cert, key and ca has to be supplied for verified SSL mode" unless params[:cert_file] && params[:key_file] && params[:ts_files] raise "Cannot find certificate file #{params[:cert_file]}" unless File.exist?(params[:cert_file]) raise "Cannot find key file #{params[:key_file]}" unless File.exist?(params[:key_file]) params[:ts_files].split(",").each do |ca| raise "Cannot find CA file #{ca}" unless File.exist?(ca) end begin ::Stomp::SSLParams.new(params) rescue NameError raise "Stomp gem >= 1.2.2 is needed" end rescue Exception => e if fallback Log.warn("Failed to set full SSL verified mode, falling back to unverified: #{e.class}: #{e}") return true else Log.error("Failed to set full SSL verified mode: #{e.class}: #{e}") raise(e) end end |
#stomp_version ⇒ Object
277 278 279 |
# File 'lib/mcollective/connector/activemq.rb', line 277 def stomp_version ::Stomp::Version::STRING end |
#stomp_version_supports_heartbeat? ⇒ Boolean
281 282 283 |
# File 'lib/mcollective/connector/activemq.rb', line 281 def stomp_version_supports_heartbeat? return Util.versioncmp(stomp_version, "1.2.10") >= 0 end |
#subscribe(agent, type, collective) ⇒ Object
Subscribe to a topic or queue
444 445 446 447 448 449 450 451 452 453 454 |
# File 'lib/mcollective/connector/activemq.rb', line 444 def subscribe(agent, type, collective) source = make_target(agent, type, collective) unless @subscriptions.include?(source[:id]) Log.debug("Subscribing to #{source[:name]} with headers #{source[:headers].inspect.chomp}") @connection.subscribe(source[:name], source[:headers], source[:id]) @subscriptions << source[:id] end rescue ::Stomp::Error::DuplicateSubscription Log.error("Received subscription request for #{source.inspect.chomp} but already had a matching subscription, ignoring") end |
#target_for(msg) ⇒ Object
465 466 467 468 469 470 471 472 473 474 475 |
# File 'lib/mcollective/connector/activemq.rb', line 465 def target_for(msg) if msg.type == :reply target = {:name => msg.request.headers["reply-to"], :headers => {}} elsif [:request, :direct_request].include?(msg.type) target = make_target(msg.agent, msg.type, msg.collective) else raise "Don't now how to create a target for message type #{msg.type}" end return target end |
#unsubscribe(agent, type, collective) ⇒ Object
UnSubscribe to a topic or queue
457 458 459 460 461 462 463 |
# File 'lib/mcollective/connector/activemq.rb', line 457 def unsubscribe(agent, type, collective) source = make_target(agent, type, collective) Log.debug("Unsubscribing from #{source[:name]}") @connection.unsubscribe(source[:name], source[:headers], source[:id]) @subscriptions.delete(source[:id]) end |