Class: MCollective::Connector::Rabbitmq
- Defined in:
- lib/mcollective/connector/rabbitmq.rb
Defined Under Namespace
Classes: EventLogger
Instance Attribute Summary collapse
-
#connection ⇒ Object
readonly
Returns the value of attribute connection.
Instance Method Summary collapse
-
#connect(connector = ::Stomp::Connection) ⇒ Object
Connects to the RabbitMQ middleware.
- #connection_headers ⇒ Object
-
#disconnect ⇒ Object
Disconnects from the RabbitMQ connection.
-
#exponential_back_off ⇒ Object
Calculate the exponential backoff needed.
-
#get_bool_option(val, default) ⇒ Object
looks up a boolean value in the config.
-
#get_cert_file(poolnum) ⇒ Object
Returns the name of the certificate file used by RabbitMQ Will first check if an environment variable MCOLLECTIVE_RABBITMQ_POOLX_SSL_CERT exists, where X is the RabbitMQ pool number.
-
#get_env_or_option(env, opt, default = nil) ⇒ Object
looks in the environment first then in the config file for a specific option, accepts an optional default.
-
#get_key_file(poolnum) ⇒ Object
Returns the name of the private key file used by RabbitMQ Will first check if an environment variable MCOLLECTIVE_RABBITMQ_POOLX_SSL_KEY exists, where X is the RabbitMQ pool number.
-
#get_option(opt, default = nil) ⇒ Object
looks for a config option, accepts an optional default.
-
#initialize ⇒ Rabbitmq
constructor
A new instance of Rabbitmq.
- #make_target(agent, type, collective, reply_to = nil, node = nil) ⇒ Object
-
#publish(msg) ⇒ Object
Sends a message to the RabbitMQ connection.
-
#receive ⇒ Object
Receives a message from the RabbitMQ connection.
-
#ssl_parameters(poolnum, fallback) ⇒ Object
Sets the SSL paramaters for a specific connection.
- #stomp_version ⇒ Object
- #stomp_version_supports_heartbeat? ⇒ Boolean
-
#subscribe(agent, type, collective) ⇒ Object
Subscribe to a topic or queue.
- #target_for(msg, node = nil) ⇒ Object
-
#unsubscribe(agent, type, collective) ⇒ Object
Subscribe to a topic or queue.
Methods inherited from Base
Constructor Details
#initialize ⇒ Rabbitmq
Returns a new instance of Rabbitmq.
96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/mcollective/connector/rabbitmq.rb', line 96 def initialize @config = Config.instance @subscriptions = [] @base64 = false @use_exponential_back_off = get_bool_option("rabbitmq.use_exponential_back_off", "true") @initial_reconnect_delay = Float(get_option("rabbitmq.initial_reconnect_delay", 0.01)) @back_off_multiplier = Integer(get_option("rabbitmq.back_off_multiplier", 2)) @max_reconnect_delay = Float(get_option("rabbitmq.max_reconnect_delay", 30.0)) @reconnect_delay = @initial_reconnect_delay Log.info("RabbitMQ connector initialized. Using stomp-gem #{stomp_version}") end |
Instance Attribute Details
#connection ⇒ Object (readonly)
Returns the value of attribute connection.
6 7 8 |
# File 'lib/mcollective/connector/rabbitmq.rb', line 6 def connection @connection end |
Instance Method Details
#connect(connector = ::Stomp::Connection) ⇒ Object
Connects to the RabbitMQ middleware
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 |
# File 'lib/mcollective/connector/rabbitmq.rb', line 110 def connect(connector = ::Stomp::Connection) if @connection Log.debug("Already connection, not re-initializing connection") return end begin @base64 = get_bool_option("rabbitmq.base64", "false") pools = Integer(get_option("rabbitmq.pool.size")) hosts = [] middleware_user = '' middleware_password = '' prompt_for_username = get_bool_option("rabbitmq.prompt_user", "false") prompt_for_password = get_bool_option("rabbitmq.prompt_password", "false") if prompt_for_username Log.debug("No previous user exists and rabbitmq.prompt-user is set to true") print "Please enter user to connect to middleware: " middleware_user = STDIN.gets.chomp end if prompt_for_password Log.debug("No previous password exists and rabbitmq.prompt-password is set to true") middleware_password = MCollective::Util.get_hidden_input("Please enter password: ") print "\n" end 1.upto(pools) do |poolnum| host = {} host[:host] = get_option("rabbitmq.pool.#{poolnum}.host") host[:port] = get_option("rabbitmq.pool.#{poolnum}.port", 61613).to_i host[:ssl] = get_bool_option("rabbitmq.pool.#{poolnum}.ssl", "false") # read user from config file host[:login] = get_env_or_option("STOMP_USER", "rabbitmq.pool.#{poolnum}.user", middleware_user) if prompt_for_username and host[:login] != middleware_user Log.info("Using #{host[:login]} from config file to connect to #{host[:host]}. "+ "plugin.rabbitmq.prompt_user should be set to false to remove the prompt.") end # read password from config file host[:passcode] = get_env_or_option("STOMP_PASSWORD", "rabbitmq.pool.#{poolnum}.password", middleware_password) if prompt_for_password and host[:passcode] != middleware_password Log.info("Using password from config file to connect to #{host[:host]}. "+ "plugin.rabbitmq.prompt_password should be set to false to remove the prompt.") end # if ssl is enabled set :ssl to the hash of parameters if host[:ssl] host[:ssl] = ssl_parameters(poolnum, get_bool_option("rabbitmq.pool.#{poolnum}.ssl.fallback", "false")) end Log.debug("Adding #{host[:host]}:#{host[:port]} to the connection pool") hosts << host end raise "No hosts found for the RabbitMQ connection pool" if hosts.size == 0 connection = {:hosts => hosts} # Various STOMP gem options, defaults here matches defaults for 1.1.6 the meaning of # these can be guessed, the documentation isn't clear connection[:use_exponential_back_off] = @use_exponential_back_off connection[:initial_reconnect_delay] = @initial_reconnect_delay connection[:back_off_multiplier] = @back_off_multiplier connection[:max_reconnect_delay] = @max_reconnect_delay connection[:max_reconnect_attempts] = Integer(get_option("rabbitmq.max_reconnect_attempts", 0)) connection[:randomize] = get_bool_option("rabbitmq.randomize", "false") connection[:backup] = get_bool_option("rabbitmq.backup", "false") connection[:timeout] = Integer(get_option("rabbitmq.timeout", -1)) connection[:connect_timeout] = Integer(get_option("rabbitmq.connect_timeout", 30)) connection[:reliable] = true connection[:max_hbrlck_fails] = Integer(get_option("rabbitmq.max_hbrlck_fails", 0)) connection[:max_hbread_fails] = Integer(get_option("rabbitmq.max_hbread_fails", 2)) connection[:connect_headers] = connection_headers connection[:logger] = EventLogger.new @connection = connector.new(connection) rescue ClientTimeoutError => e raise e rescue Exception => e raise("Could not connect to RabbitMQ Server: #{e}") end end |
#connection_headers ⇒ Object
200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 |
# File 'lib/mcollective/connector/rabbitmq.rb', line 200 def connection_headers headers = {:"accept-version" => "1.0"} heartbeat_interval = Integer(get_option("rabbitmq.heartbeat_interval", 0)) stomp_1_0_fallback = get_bool_option("rabbitmq.stomp_1_0_fallback", true) headers[:host] = get_option("rabbitmq.vhost", "/") if heartbeat_interval > 0 unless stomp_version_supports_heartbeat? raise("Setting STOMP 1.1 properties like heartbeat intervals require at least version 1.2.10 of the STOMP gem") end if heartbeat_interval < 30 Log.warn("Connection heartbeat is set to %d, forcing to minimum value of 30s") heartbeat_interval = 30 end heartbeat_interval = heartbeat_interval * 1000 headers[:"heart-beat"] = "%d,%d" % [heartbeat_interval + 500, heartbeat_interval - 500] if stomp_1_0_fallback headers[:"accept-version"] = "1.1,1.0" else headers[:"accept-version"] = "1.1" end else if stomp_version_supports_heartbeat? Log.info("Connecting without STOMP 1.1 heartbeats, consider setting plugin.rabbitmq.heartbeat_interval") end end headers end |
#disconnect ⇒ Object
Disconnects from the RabbitMQ connection
481 482 483 484 485 |
# File 'lib/mcollective/connector/rabbitmq.rb', line 481 def disconnect Log.debug("Disconnecting from RabbitMQ") @connection.disconnect @connection = nil end |
#exponential_back_off ⇒ Object
Calculate the exponential backoff needed
294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 |
# File 'lib/mcollective/connector/rabbitmq.rb', line 294 def exponential_back_off if !@use_exponential_back_off return nil end backoff = @reconnect_delay # calculate next delay @reconnect_delay = @reconnect_delay * @back_off_multiplier # cap at max reconnect delay if @reconnect_delay > @max_reconnect_delay @reconnect_delay = @max_reconnect_delay end return backoff end |
#get_bool_option(val, default) ⇒ Object
looks up a boolean value in the config
510 511 512 |
# File 'lib/mcollective/connector/rabbitmq.rb', line 510 def get_bool_option(val, default) Util.str_to_bool(@config.pluginconf.fetch(val, default)) end |
#get_cert_file(poolnum) ⇒ Object
Returns the name of the certificate file used by RabbitMQ Will first check if an environment variable MCOLLECTIVE_RABBITMQ_POOLX_SSL_CERT exists, where X is the RabbitMQ pool number. If the environment variable doesn’t exist, it will try and load the value from the config.
289 290 291 |
# File 'lib/mcollective/connector/rabbitmq.rb', line 289 def get_cert_file(poolnum) ENV["MCOLLECTIVE_RABBITMQ_POOL%s_SSL_CERT" % poolnum] || get_option("rabbitmq.pool.#{poolnum}.ssl.cert", false) end |
#get_env_or_option(env, opt, default = nil) ⇒ Object
looks in the environment first then in the config file for a specific option, accepts an optional default.
raises an exception when it cant find a value anywhere
491 492 493 494 495 496 497 |
# File 'lib/mcollective/connector/rabbitmq.rb', line 491 def get_env_or_option(env, opt, default=nil) return ENV[env] if ENV.include?(env) return @config.pluginconf[opt] if @config.pluginconf.include?(opt) return default if default raise("No #{env} environment or plugin.#{opt} configuration option given") end |
#get_key_file(poolnum) ⇒ Object
Returns the name of the private key file used by RabbitMQ Will first check if an environment variable MCOLLECTIVE_RABBITMQ_POOLX_SSL_KEY exists, where X is the RabbitMQ pool number. If the environment variable doesn’t exist, it will try and load the value from the config.
281 282 283 |
# File 'lib/mcollective/connector/rabbitmq.rb', line 281 def get_key_file(poolnum) ENV["MCOLLECTIVE_RABBITMQ_POOL%s_SSL_KEY" % poolnum] || get_option("rabbitmq.pool.#{poolnum}.ssl.key", false) end |
#get_option(opt, default = nil) ⇒ Object
looks for a config option, accepts an optional default
raises an exception when it cant find a value anywhere
502 503 504 505 506 507 |
# File 'lib/mcollective/connector/rabbitmq.rb', line 502 def get_option(opt, default=nil) return @config.pluginconf[opt] if @config.pluginconf.include?(opt) return default unless default.nil? raise("No plugin.#{opt} configuration option given") end |
#make_target(agent, type, collective, reply_to = nil, node = nil) ⇒ Object
386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 |
# File 'lib/mcollective/connector/rabbitmq.rb', line 386 def make_target(agent, type, collective, reply_to=nil, node=nil) raise("Unknown target type #{type}") unless [:directed, :broadcast, :reply, :request, :direct_request].include?(type) raise("Unknown collective '#{collective}' known collectives are '#{@config.collectives.join ', '}'") unless @config.collectives.include?(collective) agents_multiplex = get_bool_option("rabbitmq.agents_multiplex", "false") target = {:name => "", :headers => {}, :id => nil} if reply_to reply_path = reply_to elsif get_bool_option("rabbitmq.use_reply_exchange", false) reply_path = "/exchange/mcollective_reply/%s_%s_%s" % [ @config.identity, $$, Client.request_sequence ] else reply_path = "/temp-queue/mcollective_reply_%s" % agent end case type when :reply # receiving replies on a temp queue target[:name] = reply_path target[:id] = "mcollective_%s_replies" % agent when :broadcast, :request # publishing a request to all nodes with an agent if agents_multiplex target[:name] = "/exchange/%s_broadcast" % collective target[:id] = "%s_broadcast" % collective else target[:name] = "/exchange/%s_broadcast/%s" % [collective, agent] target[:id] = "%s_broadcast_%s" % [collective, agent] end if reply_to target[:headers]["reply-to"] = reply_to else target[:headers]["reply-to"] = reply_path end when :direct_request # a request to a specific node raise "Directed requests need to have a node identity" unless node target[:name] = "/exchange/%s_directed/%s" % [ collective, node] target[:headers]["reply-to"] = reply_path when :directed # subscribing to directed messages target[:name] = "/exchange/%s_directed/%s" % [ collective, @config.identity ] target[:id] = "%s_%s_directed_to_identity" % [ collective, @config.identity ] end target end |
#publish(msg) ⇒ Object
Sends a message to the RabbitMQ connection
345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 |
# File 'lib/mcollective/connector/rabbitmq.rb', line 345 def publish(msg) msg.base64_encode! if @base64 if msg.type == :direct_request msg.discovered_hosts.each do |node| target = target_for(msg, node) Log.debug("Sending a direct message to RabbitMQ target '#{target[:name]}' with headers '#{target[:headers].inspect}'") @connection.publish(target[:name], msg.payload, target[:headers]) end else target = target_for(msg) Log.debug("Sending a broadcast message to RabbitMQ target '#{target[:name]}' with headers '#{target[:headers].inspect}'") @connection.publish(target[:name], msg.payload, target[:headers]) end end |
#receive ⇒ Object
Receives a message from the RabbitMQ connection
313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 |
# File 'lib/mcollective/connector/rabbitmq.rb', line 313 def receive Log.debug("Waiting for a message from RabbitMQ") # When the Stomp library > 1.2.0 is mid reconnecting due to its reliable connection # handling it sets the connection to closed. If we happen to be receiving at just # that time we will get an exception warning about the closed connection so handling # that here with a sleep and a retry. begin msg = @connection.receive rescue ::Stomp::Error::NoCurrentConnection sleep 1 retry end # In older stomp gems an attempt to receive after failed authentication can return nil if msg.nil? raise MessageNotReceived.new(exponential_back_off), "No message received from RabbitMQ." end raise "Received a processing error from RabbitMQ: '%s'" % msg.body.chomp if msg.body =~ /Processing error/ # We expect all messages we get to be of STOMP frame type MESSAGE, raise on unexpected types if msg.command != 'MESSAGE' Log.debug("Unexpected '#{msg.command}' frame. Headers: #{msg.headers.inspect} Body: #{msg.body.inspect}") raise UnexpectedMessageType.new(exponential_back_off), "Received frame of type '#{msg.command}' expected 'MESSAGE'" end Message.new(msg.body, msg, :base64 => @base64, :headers => msg.headers) end |
#ssl_parameters(poolnum, fallback) ⇒ Object
Sets the SSL paramaters for a specific connection
244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 |
# File 'lib/mcollective/connector/rabbitmq.rb', line 244 def ssl_parameters(poolnum, fallback) params = { :cert_file => get_cert_file(poolnum), :key_file => get_key_file(poolnum), :ts_files => get_option("rabbitmq.pool.#{poolnum}.ssl.ca", false), :ciphers => get_option("rabbitmq.pool.#{poolnum}.ssl.ciphers", false), } raise "cert, key and ca has to be supplied for verified SSL mode" unless params[:cert_file] && params[:key_file] && params[:ts_files] raise "Cannot find certificate file #{params[:cert_file]}" unless File.exist?(params[:cert_file]) raise "Cannot find key file #{params[:key_file]}" unless File.exist?(params[:key_file]) params[:ts_files].split(",").each do |ca| raise "Cannot find CA file #{ca}" unless File.exist?(ca) end begin ::Stomp::SSLParams.new(params) rescue NameError raise "Stomp gem >= 1.2.2 is needed" end rescue Exception => e if fallback Log.warn("Failed to set full SSL verified mode, falling back to unverified: #{e.class}: #{e}") return true else Log.error("Failed to set full SSL verified mode: #{e.class}: #{e}") raise(e) end end |
#stomp_version ⇒ Object
235 236 237 |
# File 'lib/mcollective/connector/rabbitmq.rb', line 235 def stomp_version ::Stomp::Version::STRING end |
#stomp_version_supports_heartbeat? ⇒ Boolean
239 240 241 |
# File 'lib/mcollective/connector/rabbitmq.rb', line 239 def stomp_version_supports_heartbeat? return Util.versioncmp(stomp_version, "1.2.10") >= 0 end |
#subscribe(agent, type, collective) ⇒ Object
Subscribe to a topic or queue
434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 |
# File 'lib/mcollective/connector/rabbitmq.rb', line 434 def subscribe(agent, type, collective) if type == :reply # On rabbitmq if you send a message with a reply-to: header set to # '/temp-queue/*' it automatically creates a private queue, munges # the reply-to: header to point to this private queue, and # subscribes you to it. As such you should never attempt to # SUBSCRIBE or UNSUBSCRIBE to '/temp-queue/*' directly as that'll # cause great pain and suffering. # https://www.rabbitmq.com/stomp.html#d.tqd # The exception to this is in 'use_reply_exchange' mode, when the # reply-to will be set to a queue in an explicit exchange. if !get_bool_option("rabbitmq.use_reply_exchange", false) # We aren't in 'use_reply_exchange' mode, don't subscribe. return end end source = make_target(agent, type, collective) unless @subscriptions.include?(source[:id]) Log.debug("Subscribing to #{source[:name]} with headers #{source[:headers].inspect.chomp}") @connection.subscribe(source[:name], source[:headers], source[:id]) @subscriptions << source[:id] end rescue ::Stomp::Error::DuplicateSubscription Log.error("Received subscription request for #{source.inspect.chomp} but already had a matching subscription, ignoring") end |
#target_for(msg, node = nil) ⇒ Object
365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 |
# File 'lib/mcollective/connector/rabbitmq.rb', line 365 def target_for(msg, node=nil) if msg.type == :reply target = {:name => msg.request.headers["reply-to"], :headers => {}, :id => ""} elsif [:request, :direct_request].include?(msg.type) target = make_target(msg.agent, msg.type, msg.collective, msg.reply_to, node) else raise "Don't now how to create a target for message type #{msg.type}" end # marks messages as valid for ttl + 10 seconds, we do this here # rather than in make_target as this should only be set on publish target[:headers]["expiration"] = ((msg.ttl + 10) * 1000).to_s target[:headers]["mc_sender"] = Config.instance.identity return target end |
#unsubscribe(agent, type, collective) ⇒ Object
Subscribe to a topic or queue
464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 |
# File 'lib/mcollective/connector/rabbitmq.rb', line 464 def unsubscribe(agent, type, collective) if type == :reply # For a more detailed discussion of this logic, please see #subscribe if !get_bool_option("rabbitmq.use_reply_exchange", false) # We shouldn't try to unsubscribe from a '/temp-queue/*' queue. return end end source = make_target(agent, type, collective) Log.debug("Unsubscribing from #{source[:name]}") @connection.unsubscribe(source[:name], source[:headers], source[:id]) @subscriptions.delete(source[:id]) end |