Class: Bunny::Session
- Inherits:
-
Object
- Object
- Bunny::Session
- Defined in:
- lib/bunny/session.rb
Overview
Represents AMQP 0.9.1 connection (to a RabbitMQ node).
Constant Summary collapse
- DEFAULT_HOST =
Default host used for connection
"127.0.0.1"
- DEFAULT_VHOST =
Default virtual host used for connection
"/"
- DEFAULT_USER =
Default username used for connection
"guest"
- DEFAULT_PASSWORD =
Default password used for connection
"guest"
- DEFAULT_HEARTBEAT =
Default heartbeat interval, the same value as RabbitMQ 3.0 uses.
:server
- DEFAULT_CHANNEL_MAX =
2047
- DEFAULT_CLIENT_PROPERTIES =
RabbitMQ client metadata
{ :capabilities => { :publisher_confirms => true, :consumer_cancel_notify => true, :exchange_exchange_bindings => true, :"basic.nack" => true, :"connection.blocked" => true, # See http://www.rabbitmq.com/auth-notification.html :authentication_failure_close => true }, :product => "Bunny", :platform => ::RUBY_DESCRIPTION, :version => Bunny::VERSION, :information => "http://rubybunny.info", }
- DEFAULT_NETWORK_RECOVERY_INTERVAL =
Default reconnection interval for TCP connection failures
5.0
- DEFAULT_RECOVERABLE_EXCEPTIONS =
[StandardError, TCPConnectionFailedForAllHosts, TCPConnectionFailed, AMQ::Protocol::EmptyResponseError, SystemCallError, Timeout::Error, Bunny::ConnectionLevelException, Bunny::ConnectionClosedError]
Instance Attribute Summary collapse
-
#channel_id_allocator ⇒ Object
readonly
Returns the value of attribute channel_id_allocator.
-
#channel_max ⇒ Object
readonly
Returns the value of attribute channel_max.
-
#connection_name ⇒ Object
readonly
Returns the value of attribute connection_name.
-
#continuation_timeout ⇒ Integer
readonly
Timeout for blocking protocol operations (queue.declare, queue.bind, etc), in milliseconds.
-
#frame_max ⇒ Object
readonly
Returns the value of attribute frame_max.
-
#heartbeat ⇒ Object
readonly
Returns the value of attribute heartbeat.
- #logger ⇒ Logger readonly
-
#mechanism ⇒ String
readonly
Authentication mechanism, e.g.
-
#network_recovery_interval ⇒ Object
readonly
Returns the value of attribute network_recovery_interval.
-
#pass ⇒ Object
readonly
Returns the value of attribute pass.
-
#recoverable_exceptions ⇒ Object
Returns the value of attribute recoverable_exceptions.
-
#server_authentication_mechanisms ⇒ Object
readonly
Returns the value of attribute server_authentication_mechanisms.
-
#server_capabilities ⇒ Object
readonly
Returns the value of attribute server_capabilities.
-
#server_locales ⇒ Object
readonly
Returns the value of attribute server_locales.
-
#server_properties ⇒ Object
readonly
Returns the value of attribute server_properties.
-
#socket_configurator ⇒ Object
Returns the value of attribute socket_configurator.
-
#status ⇒ Object
readonly
Returns the value of attribute status.
-
#threaded ⇒ Object
readonly
Returns the value of attribute threaded.
- #transport ⇒ Bunny::Transport readonly
-
#user ⇒ Object
readonly
Returns the value of attribute user.
-
#vhost ⇒ Object
readonly
Returns the value of attribute vhost.
Class Method Summary collapse
-
.parse_uri(uri) ⇒ Hash
Parses an amqp[s] URI into a hash that #initialize accepts.
Instance Method Summary collapse
-
#after_recovery_attempts_exhausted(&block) ⇒ Object
Defines a callable (e.g. a block) that will be called when the connection recovery failed after the specified numbers of recovery attempts.
-
#after_recovery_completed(&block) ⇒ Object
Defines a callable (e.g. a block) that will be called after successful connection recovery.
-
#automatically_recover? ⇒ Boolean
True if this connection has automatic recovery from network failure enabled.
-
#before_recovery_attempt_starts(&block) ⇒ Object
Defines a callable (e.g. a block) that will be called before every connection recovery attempt.
-
#blocked? ⇒ Boolean
True if the connection is currently blocked by RabbitMQ because it's running low on RAM, disk space, or other resource; false otherwise.
- #clean_up_and_fail_on_connection_close!(method) ⇒ Object
- #clean_up_on_shutdown ⇒ Object
-
#close(await_response = true) ⇒ Object
(also: #stop)
Closes the connection.
-
#closed? ⇒ Boolean
True if this AMQP 0.9.1 connection is closed.
-
#closing? ⇒ Boolean
private
True if this AMQP 0.9.1 connection is closing.
-
#configure_socket(&block) ⇒ Object
Provides a way to fine tune the socket used by connection.
-
#connecting? ⇒ Boolean
True if this connection is still not fully open.
-
#create_channel(n = nil, consumer_pool_size = 1, consumer_pool_abort_on_exception = false, consumer_pool_shutdown_timeout = 60) ⇒ Bunny::Channel
(also: #channel)
Opens a new channel and returns it.
-
#exchange_exists?(name) ⇒ Boolean
Checks if a exchange with given name exists.
- #heartbeat_disabled?(val) ⇒ Boolean protected
- #heartbeat_interval ⇒ Integer deprecated Deprecated.
-
#heartbeat_timeout ⇒ Integer
Heartbeat timeout used.
- #host ⇒ Object
-
#hostname ⇒ String
RabbitMQ hostname (or IP address) used.
- #ignoring_io_errors(&block) ⇒ Object protected
-
#initialize(connection_string_or_opts = , optz = Hash.new) ⇒ Session
constructor
A new instance of Session.
- #inspect ⇒ Object
-
#local_port ⇒ Integer
Client socket port.
-
#manually_closed? ⇒ Boolean
True if this AMQP 0.9.1 connection has been closed by the user (as opposed to the server).
- #normalize_auth_mechanism(value) ⇒ Object protected
- #normalize_client_channel_max(n) ⇒ Object protected
-
#on_blocked {|AMQ::Protocol::Connection::Blocked| ... } ⇒ Object
Defines a callback that will be executed when RabbitMQ blocks the connection because it is running low on memory or disk space (as configured via config file and/or rabbitmqctl).
-
#on_unblocked(&block) ⇒ Object
Defines a callback that will be executed when RabbitMQ unblocks the connection that was previously blocked, e.g.
-
#open? ⇒ Boolean
(also: #connected?)
True if this AMQP 0.9.1 connection is open.
-
#password ⇒ String
Password used.
- #port ⇒ Object
-
#queue_exists?(name) ⇒ Boolean
Checks if a queue with given name exists.
- #reset_address_index ⇒ Object
-
#start ⇒ Object
Starts the connection process.
-
#threaded? ⇒ Boolean
True if this connection uses a separate thread for I/O activity.
- #to_s ⇒ String
- #update_secret(value, reason) ⇒ Object
-
#username ⇒ String
Username used.
-
#uses_ssl? ⇒ Boolean
(also: #ssl?)
True if this connection uses TLS (SSL).
-
#uses_tls? ⇒ Boolean
(also: #tls?)
True if this connection uses TLS (SSL).
- #validate_connection_options(options) ⇒ Object
-
#virtual_host ⇒ String
Virtual host used.
-
#with_channel(n = nil) ⇒ Bunny::Session
Creates a temporary channel, yields it to the block given to this method and closes it.
Constructor Details
#initialize(connection_string_or_opts = , optz = Hash.new) ⇒ Session
Returns a new instance of Session.
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 |
# File 'lib/bunny/session.rb', line 142 def initialize(connection_string_or_opts = ENV['RABBITMQ_URL'], optz = Hash.new) opts = case (connection_string_or_opts) when nil then Hash.new when String then self.class.parse_uri(connection_string_or_opts) when Hash then connection_string_or_opts end.merge(optz) @default_hosts_shuffle_strategy = Proc.new { |hosts| hosts.shuffle } @opts = opts log_file = opts[:log_file] || opts[:logfile] || STDOUT log_level = opts[:log_level] || ENV["BUNNY_LOG_LEVEL"] || Logger::WARN # we might need to log a warning about ill-formatted IPv6 address but # progname includes hostname, so init like this first @logger = opts.fetch(:logger, init_default_logger_without_progname(log_file, log_level)) @addresses = self.addresses_from(opts) @address_index = 0 @transport = nil @user = self.username_from(opts) @pass = self.password_from(opts) @vhost = self.vhost_from(opts) @threaded = opts.fetch(:threaded, true) # re-init, see above @logger = opts.fetch(:logger, init_default_logger(log_file, log_level)) (opts) @last_connection_error = nil # should automatic recovery from network failures be used? @automatically_recover = if opts[:automatically_recover].nil? && opts[:automatic_recovery].nil? true else opts[:automatically_recover] | opts[:automatic_recovery] end @recovering_from_network_failure = false @max_recovery_attempts = opts[:recovery_attempts] @recovery_attempts = @max_recovery_attempts # When this is set, connection attempts won't be reset after # successful reconnection. Some find this behavior more sensible # than the per-failure attempt counter. MK. @reset_recovery_attempt_counter_after_reconnection = opts.fetch(:reset_recovery_attempts_after_reconnection, true) @network_recovery_interval = opts.fetch(:network_recovery_interval, DEFAULT_NETWORK_RECOVERY_INTERVAL) @recover_from_connection_close = opts.fetch(:recover_from_connection_close, true) # in ms @continuation_timeout = opts.fetch(:continuation_timeout, DEFAULT_CONTINUATION_TIMEOUT) @status = :not_connected @manually_closed = false @blocked = false # these are negotiated with the broker during the connection tuning phase @client_frame_max = opts.fetch(:frame_max, DEFAULT_FRAME_MAX) @client_channel_max = normalize_client_channel_max(opts.fetch(:channel_max, DEFAULT_CHANNEL_MAX)) # will be-renegotiated during connection tuning steps. MK. @channel_max = @client_channel_max @heartbeat_sender = nil @client_heartbeat = self.heartbeat_from(opts) client_props = opts[:properties] || opts[:client_properties] || {} @connection_name = client_props[:connection_name] || opts[:connection_name] @client_properties = DEFAULT_CLIENT_PROPERTIES.merge(client_props) .merge(connection_name: connection_name) @mechanism = normalize_auth_mechanism(opts.fetch(:auth_mechanism, "PLAIN")) @credentials_encoder = credentials_encoder_for(@mechanism) @locale = @opts.fetch(:locale, DEFAULT_LOCALE) @mutex_impl = @opts.fetch(:mutex_impl, Monitor) # mutex for the channel id => channel hash @channel_mutex = @mutex_impl.new # transport operations/continuations mutex. A workaround for # the non-reentrant Ruby mutexes. MK. @transport_mutex = @mutex_impl.new @status_mutex = @mutex_impl.new @address_index_mutex = @mutex_impl.new @channels = Hash.new @recovery_attempt_started = opts[:recovery_attempt_started] @recovery_completed = opts[:recovery_completed] @recovery_attempts_exhausted = opts[:recovery_attempts_exhausted] @session_error_handler = opts.fetch(:session_error_handler, Thread.current) @recoverable_exceptions = DEFAULT_RECOVERABLE_EXCEPTIONS.dup self.reset_continuations self.initialize_transport end |
Instance Attribute Details
#channel_id_allocator ⇒ Object (readonly)
Returns the value of attribute channel_id_allocator.
84 85 86 |
# File 'lib/bunny/session.rb', line 84 def channel_id_allocator @channel_id_allocator end |
#channel_max ⇒ Object (readonly)
Returns the value of attribute channel_max.
82 83 84 |
# File 'lib/bunny/session.rb', line 82 def channel_max @channel_max end |
#connection_name ⇒ Object (readonly)
Returns the value of attribute connection_name.
93 94 95 |
# File 'lib/bunny/session.rb', line 93 def connection_name @connection_name end |
#continuation_timeout ⇒ Integer (readonly)
Returns Timeout for blocking protocol operations (queue.declare, queue.bind, etc), in milliseconds. Default is 15000.
91 92 93 |
# File 'lib/bunny/session.rb', line 91 def continuation_timeout @continuation_timeout end |
#frame_max ⇒ Object (readonly)
Returns the value of attribute frame_max.
82 83 84 |
# File 'lib/bunny/session.rb', line 82 def frame_max @frame_max end |
#heartbeat ⇒ Object (readonly)
Returns the value of attribute heartbeat.
82 83 84 |
# File 'lib/bunny/session.rb', line 82 def heartbeat @heartbeat end |
#logger ⇒ Logger (readonly)
89 90 91 |
# File 'lib/bunny/session.rb', line 89 def logger @logger end |
#mechanism ⇒ String (readonly)
Authentication mechanism, e.g. "PLAIN" or "EXTERNAL"
87 88 89 |
# File 'lib/bunny/session.rb', line 87 def mechanism @mechanism end |
#network_recovery_interval ⇒ Object (readonly)
Returns the value of attribute network_recovery_interval.
92 93 94 |
# File 'lib/bunny/session.rb', line 92 def network_recovery_interval @network_recovery_interval end |
#pass ⇒ Object (readonly)
Returns the value of attribute pass.
82 83 84 |
# File 'lib/bunny/session.rb', line 82 def pass @pass end |
#recoverable_exceptions ⇒ Object
Returns the value of attribute recoverable_exceptions.
95 96 97 |
# File 'lib/bunny/session.rb', line 95 def recoverable_exceptions @recoverable_exceptions end |
#server_authentication_mechanisms ⇒ Object (readonly)
Returns the value of attribute server_authentication_mechanisms.
83 84 85 |
# File 'lib/bunny/session.rb', line 83 def server_authentication_mechanisms @server_authentication_mechanisms end |
#server_capabilities ⇒ Object (readonly)
Returns the value of attribute server_capabilities.
83 84 85 |
# File 'lib/bunny/session.rb', line 83 def server_capabilities @server_capabilities end |
#server_locales ⇒ Object (readonly)
Returns the value of attribute server_locales.
83 84 85 |
# File 'lib/bunny/session.rb', line 83 def server_locales @server_locales end |
#server_properties ⇒ Object (readonly)
Returns the value of attribute server_properties.
83 84 85 |
# File 'lib/bunny/session.rb', line 83 def server_properties @server_properties end |
#socket_configurator ⇒ Object
Returns the value of attribute socket_configurator.
94 95 96 |
# File 'lib/bunny/session.rb', line 94 def socket_configurator @socket_configurator end |
#status ⇒ Object (readonly)
Returns the value of attribute status.
82 83 84 |
# File 'lib/bunny/session.rb', line 82 def status @status end |
#threaded ⇒ Object (readonly)
Returns the value of attribute threaded.
82 83 84 |
# File 'lib/bunny/session.rb', line 82 def threaded @threaded end |
#transport ⇒ Bunny::Transport (readonly)
81 82 83 |
# File 'lib/bunny/session.rb', line 81 def transport @transport end |
#user ⇒ Object (readonly)
Returns the value of attribute user.
82 83 84 |
# File 'lib/bunny/session.rb', line 82 def user @user end |
#vhost ⇒ Object (readonly)
Returns the value of attribute vhost.
82 83 84 |
# File 'lib/bunny/session.rb', line 82 def vhost @vhost end |
Class Method Details
.parse_uri(uri) ⇒ Hash
Parses an amqp[s] URI into a hash that #initialize accepts.
502 503 504 |
# File 'lib/bunny/session.rb', line 502 def self.parse_uri(uri) AMQ::Settings.configure(uri) end |
Instance Method Details
#after_recovery_attempts_exhausted(&block) ⇒ Object
Defines a callable (e.g. a block) that will be called when the connection recovery failed after the specified numbers of recovery attempts.
563 564 565 |
# File 'lib/bunny/session.rb', line 563 def after_recovery_attempts_exhausted(&block) @recovery_attempts_exhausted = block end |
#after_recovery_completed(&block) ⇒ Object
Defines a callable (e.g. a block) that will be called after successful connection recovery.
556 557 558 |
# File 'lib/bunny/session.rb', line 556 def after_recovery_completed(&block) @recovery_completed = block end |
#automatically_recover? ⇒ Boolean
Returns true if this connection has automatic recovery from network failure enabled.
466 467 468 |
# File 'lib/bunny/session.rb', line 466 def automatically_recover? @automatically_recover end |
#before_recovery_attempt_starts(&block) ⇒ Object
Defines a callable (e.g. a block) that will be called before every connection recovery attempt.
550 551 552 |
# File 'lib/bunny/session.rb', line 550 def before_recovery_attempt_starts(&block) @recovery_attempt_started = block end |
#blocked? ⇒ Boolean
Returns true if the connection is currently blocked by RabbitMQ because it's running low on RAM, disk space, or other resource; false otherwise.
494 495 496 |
# File 'lib/bunny/session.rb', line 494 def blocked? @blocked end |
#clean_up_and_fail_on_connection_close!(method) ⇒ Object
907 908 909 910 911 912 913 914 915 916 917 |
# File 'lib/bunny/session.rb', line 907 def clean_up_and_fail_on_connection_close!(method) @last_connection_error = instantiate_connection_level_exception(method) @continuations.push(method) clean_up_on_shutdown if threaded? @session_error_handler.raise(@last_connection_error) else raise @last_connection_error end end |
#clean_up_on_shutdown ⇒ Object
919 920 921 922 923 924 925 926 927 928 929 930 931 |
# File 'lib/bunny/session.rb', line 919 def clean_up_on_shutdown begin shut_down_all_consumer_work_pools! maybe_shutdown_reader_loop maybe_shutdown_heartbeat_sender rescue ShutdownSignal => _sse # no-op rescue Exception => e @logger.warn "Caught an exception when cleaning up after receiving connection.close: #{e.}" ensure close_transport end end |
#close(await_response = true) ⇒ Object Also known as: stop
Closes the connection. This involves closing all of its channels.
398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 |
# File 'lib/bunny/session.rb', line 398 def close(await_response = true) @status_mutex.synchronize { @status = :closing } ignoring_io_errors do if @transport.open? @logger.debug "Transport is still open..." close_all_channels @logger.debug "Will close all channels...." self.close_connection(await_response) end clean_up_on_shutdown end @status_mutex.synchronize do @status = :closed @manually_closed = true end @logger.debug "Connection is closed" true end |
#closed? ⇒ Boolean
Returns true if this AMQP 0.9.1 connection is closed.
448 449 450 |
# File 'lib/bunny/session.rb', line 448 def closed? @status_mutex.synchronize { @status == :closed } end |
#closing? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns true if this AMQP 0.9.1 connection is closing.
443 444 445 |
# File 'lib/bunny/session.rb', line 443 def closing? @status_mutex.synchronize { @status == :closing } end |
#configure_socket(&block) ⇒ Object
Provides a way to fine tune the socket used by connection. Accepts a block that the socket will be yielded to.
300 301 302 303 304 |
# File 'lib/bunny/session.rb', line 300 def configure_socket(&block) raise ArgumentError, "No block provided!" if block.nil? @transport.configure_socket(&block) end |
#connecting? ⇒ Boolean
Returns true if this connection is still not fully open.
437 438 439 |
# File 'lib/bunny/session.rb', line 437 def connecting? status == :connecting end |
#create_channel(n = nil, consumer_pool_size = 1, consumer_pool_abort_on_exception = false, consumer_pool_shutdown_timeout = 60) ⇒ Bunny::Channel Also known as: channel
Opens a new channel and returns it. This method will block the calling thread until the response is received and the channel is guaranteed to be opened (this operation is very fast and inexpensive).
380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 |
# File 'lib/bunny/session.rb', line 380 def create_channel(n = nil, consumer_pool_size = 1, consumer_pool_abort_on_exception = false, consumer_pool_shutdown_timeout = 60) raise ArgumentError, "channel number 0 is reserved in the protocol and cannot be used" if 0 == n raise ConnectionAlreadyClosed if manually_closed? raise RuntimeError, "this connection is not open. Was Bunny::Session#start invoked? Is automatic recovery enabled?" if !connected? @channel_mutex.synchronize do if n && (ch = @channels[n]) ch else ch = Bunny::Channel.new(self, n, ConsumerWorkPool.new(consumer_pool_size || 1, consumer_pool_abort_on_exception, consumer_pool_shutdown_timeout)) ch.open ch end end end |
#exchange_exists?(name) ⇒ Boolean
Checks if a exchange with given name exists.
Implemented using exchange.declare with passive set to true and a one-off (short lived) channel under the hood.
536 537 538 539 540 541 542 543 544 545 546 |
# File 'lib/bunny/session.rb', line 536 def exchange_exists?(name) ch = create_channel begin ch.exchange(name, :passive => true) true rescue Bunny::NotFound => _ false ensure ch.close if ch.open? end end |
#heartbeat_disabled?(val) ⇒ Boolean (protected)
1343 1344 1345 |
# File 'lib/bunny/session.rb', line 1343 def heartbeat_disabled?(val) 0 == val || val.nil? end |
#heartbeat_interval ⇒ Integer
Returns Heartbeat timeout (not interval) used.
261 |
# File 'lib/bunny/session.rb', line 261 def heartbeat_interval; self.heartbeat; end |
#heartbeat_timeout ⇒ Integer
Returns Heartbeat timeout used.
264 |
# File 'lib/bunny/session.rb', line 264 def heartbeat_timeout; self.heartbeat; end |
#host ⇒ Object
283 284 285 |
# File 'lib/bunny/session.rb', line 283 def host @transport ? @transport.host : host_from_address(@addresses[@address_index]) end |
#hostname ⇒ String
Returns RabbitMQ hostname (or IP address) used.
251 |
# File 'lib/bunny/session.rb', line 251 def hostname; self.host; end |
#ignoring_io_errors(&block) ⇒ Object (protected)
1497 1498 1499 1500 1501 1502 1503 |
# File 'lib/bunny/session.rb', line 1497 def ignoring_io_errors(&block) begin block.call rescue AMQ::Protocol::EmptyResponseError, IOError, SystemCallError, Bunny::NetworkFailure => _ # ignore end end |
#inspect ⇒ Object
1214 1215 1216 |
# File 'lib/bunny/session.rb', line 1214 def inspect to_s end |
#local_port ⇒ Integer
Returns Client socket port.
307 308 309 |
# File 'lib/bunny/session.rb', line 307 def local_port @transport.local_address.ip_port end |
#manually_closed? ⇒ Boolean
Returns true if this AMQP 0.9.1 connection has been closed by the user (as opposed to the server).
453 454 455 |
# File 'lib/bunny/session.rb', line 453 def manually_closed? @status_mutex.synchronize { @manually_closed == true } end |
#normalize_auth_mechanism(value) ⇒ Object (protected)
1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 |
# File 'lib/bunny/session.rb', line 1486 def normalize_auth_mechanism(value) case value when [] then "PLAIN" when nil then "PLAIN" else value end end |
#normalize_client_channel_max(n) ⇒ Object (protected)
1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 |
# File 'lib/bunny/session.rb', line 1474 def normalize_client_channel_max(n) return CHANNEL_MAX_LIMIT if n.nil? return CHANNEL_MAX_LIMIT if n > CHANNEL_MAX_LIMIT case n when 0 then CHANNEL_MAX_LIMIT else n end end |
#on_blocked {|AMQ::Protocol::Connection::Blocked| ... } ⇒ Object
Defines a callback that will be executed when RabbitMQ blocks the connection because it is running low on memory or disk space (as configured via config file and/or rabbitmqctl).
477 478 479 |
# File 'lib/bunny/session.rb', line 477 def on_blocked(&block) @block_callback = block end |
#on_unblocked(&block) ⇒ Object
Defines a callback that will be executed when RabbitMQ unblocks the connection that was previously blocked, e.g. because the memory or disk space alarm has cleared.
486 487 488 |
# File 'lib/bunny/session.rb', line 486 def on_unblocked(&block) @unblock_callback = block end |
#open? ⇒ Boolean Also known as: connected?
Returns true if this AMQP 0.9.1 connection is open.
458 459 460 461 462 |
# File 'lib/bunny/session.rb', line 458 def open? @status_mutex.synchronize do (status == :open || status == :connected || status == :connecting) && @transport.open? end end |
#password ⇒ String
Returns Password used.
255 |
# File 'lib/bunny/session.rb', line 255 def password; self.pass; end |
#port ⇒ Object
287 288 289 |
# File 'lib/bunny/session.rb', line 287 def port @transport ? @transport.port : port_from_address(@addresses[@address_index]) end |
#queue_exists?(name) ⇒ Boolean
Checks if a queue with given name exists.
Implemented using queue.declare with passive set to true and a one-off (short lived) channel under the hood.
514 515 516 517 518 519 520 521 522 523 524 525 526 |
# File 'lib/bunny/session.rb', line 514 def queue_exists?(name) ch = create_channel begin ch.queue(name, :passive => true) true rescue Bunny::ResourceLocked => _ true rescue Bunny::NotFound => _ false ensure ch.close if ch.open? end end |
#reset_address_index ⇒ Object
291 292 293 |
# File 'lib/bunny/session.rb', line 291 def reset_address_index @address_index_mutex.synchronize { @address_index = 0 } end |
#start ⇒ Object
Starts the connection process.
316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 |
# File 'lib/bunny/session.rb', line 316 def start return self if connected? @status_mutex.synchronize { @status = :connecting } # reset here for cases when automatic network recovery kicks in # when we were blocked. MK. @blocked = false self.reset_continuations begin begin # close existing transport if we have one, # to not leak sockets @transport.maybe_initialize_socket @transport.post_initialize_socket @transport.connect self.init_connection self.open_connection @reader_loop = nil self.start_reader_loop if threaded? rescue TCPConnectionFailed => e @logger.warn e. self.initialize_transport @logger.warn "Will try to connect to the next endpoint in line: #{@transport.host}:#{@transport.port}" return self.start rescue @status_mutex.synchronize { @status = :not_connected } raise end rescue HostListDepleted self.reset_address_index @status_mutex.synchronize { @status = :not_connected } raise TCPConnectionFailedForAllHosts end @status_mutex.synchronize { @manually_closed = false } self end |
#threaded? ⇒ Boolean
Returns true if this connection uses a separate thread for I/O activity.
279 280 281 |
# File 'lib/bunny/session.rb', line 279 def threaded? @threaded end |
#to_s ⇒ String
1209 1210 1211 1212 |
# File 'lib/bunny/session.rb', line 1209 def to_s oid = ("0x%x" % (self.object_id << 1)) "#<#{self.class.name}:#{oid} #{@user}@#{host}:#{port}, vhost=#{@vhost}, addresses=[#{@addresses.join(',')}]>" end |
#update_secret(value, reason) ⇒ Object
360 361 362 363 364 365 366 |
# File 'lib/bunny/session.rb', line 360 def update_secret(value, reason) @transport.send_frame(AMQ::Protocol::Connection::UpdateSecret.encode(value, reason)) @last_update_secret_ok = wait_on_continuations raise_if_continuation_resulted_in_a_connection_error! @last_update_secret_ok end |
#username ⇒ String
Returns Username used.
253 |
# File 'lib/bunny/session.rb', line 253 def username; self.user; end |
#uses_ssl? ⇒ Boolean Also known as: ssl?
Returns true if this connection uses TLS (SSL).
273 274 275 |
# File 'lib/bunny/session.rb', line 273 def uses_ssl? @transport.uses_ssl? end |
#uses_tls? ⇒ Boolean Also known as: tls?
Returns true if this connection uses TLS (SSL).
267 268 269 |
# File 'lib/bunny/session.rb', line 267 def uses_tls? @transport.uses_tls? end |
#validate_connection_options(options) ⇒ Object
240 241 242 243 244 245 246 247 248 |
# File 'lib/bunny/session.rb', line 240 def () if [:hosts] && [:addresses] raise ArgumentError, "Connection options can't contain hosts and addresses at the same time" end if ([:host] || [:hostname]) && ([:hosts] || [:addresses]) @logger.warn "Connection options contain both a host and an array of hosts (addresses), please pick one." end end |
#virtual_host ⇒ String
Returns Virtual host used.
257 |
# File 'lib/bunny/session.rb', line 257 def virtual_host; self.vhost; end |
#with_channel(n = nil) ⇒ Bunny::Session
Creates a temporary channel, yields it to the block given to this method and closes it.
425 426 427 428 429 430 431 432 433 434 |
# File 'lib/bunny/session.rb', line 425 def with_channel(n = nil) ch = create_channel(n) begin yield ch ensure ch.close if ch.open? end self end |