Class: Bunny::Session
- Inherits:
-
Object
- Object
- Bunny::Session
- Defined in:
- lib/bunny/session.rb
Overview
Represents AMQP 0.9.1 connection (to a RabbitMQ node).
Constant Summary collapse
- DEFAULT_HOST =
Default host used for connection
"127.0.0.1"
- DEFAULT_VHOST =
Default virtual host used for connection
"/"
- DEFAULT_USER =
Default username used for connection
"guest"
- DEFAULT_PASSWORD =
Default password used for connection
"guest"
- DEFAULT_HEARTBEAT =
Default heartbeat interval, the same value as RabbitMQ 3.0 uses.
:server
- DEFAULT_FRAME_MAX =
131072
- CHANNEL_MAX_LIMIT =
Hard limit the user cannot go over regardless of server configuration.
65535
- DEFAULT_CHANNEL_MAX =
2047
- CONNECT_TIMEOUT =
backwards compatibility
Transport::DEFAULT_CONNECTION_TIMEOUT
- DEFAULT_CONTINUATION_TIMEOUT =
15000
- DEFAULT_CLIENT_PROPERTIES =
RabbitMQ client metadata
{ :capabilities => { :publisher_confirms => true, :consumer_cancel_notify => true, :exchange_exchange_bindings => true, :"basic.nack" => true, :"connection.blocked" => true, # See http://www.rabbitmq.com/auth-notification.html :authentication_failure_close => true }, :product => "Bunny", :platform => ::RUBY_DESCRIPTION, :version => Bunny::VERSION, :information => "http://rubybunny.info", }
- DEFAULT_LOCALE =
"en_GB"
- DEFAULT_NETWORK_RECOVERY_INTERVAL =
Default reconnection interval for TCP connection failures
5.0
- DEFAULT_RECOVERABLE_EXCEPTIONS =
[StandardError, TCPConnectionFailedForAllHosts, TCPConnectionFailed, AMQ::Protocol::EmptyResponseError, SystemCallError, Timeout::Error, Bunny::ConnectionLevelException, Bunny::ConnectionClosedError]
Instance Attribute Summary collapse
-
#channel_id_allocator ⇒ Object
readonly
Returns the value of attribute channel_id_allocator.
-
#channel_max ⇒ Object
readonly
Returns the value of attribute channel_max.
-
#connection_name ⇒ Object
readonly
Returns the value of attribute connection_name.
-
#continuation_timeout ⇒ Integer
readonly
Timeout for blocking protocol operations (queue.declare, queue.bind, etc), in milliseconds.
-
#frame_max ⇒ Object
readonly
Returns the value of attribute frame_max.
-
#heartbeat ⇒ Object
readonly
Returns the value of attribute heartbeat.
- #logger ⇒ Logger readonly
-
#mechanism ⇒ String
readonly
Authentication mechanism, e.g.
- #mutex_impl ⇒ Object readonly
-
#network_recovery_interval ⇒ Object
readonly
Returns the value of attribute network_recovery_interval.
-
#pass ⇒ Object
readonly
Returns the value of attribute pass.
-
#recoverable_exceptions ⇒ Object
Returns the value of attribute recoverable_exceptions.
-
#server_authentication_mechanisms ⇒ Object
readonly
Returns the value of attribute server_authentication_mechanisms.
-
#server_capabilities ⇒ Object
readonly
Returns the value of attribute server_capabilities.
-
#server_locales ⇒ Object
readonly
Returns the value of attribute server_locales.
-
#server_properties ⇒ Object
readonly
Returns the value of attribute server_properties.
-
#socket_configurator ⇒ Object
Returns the value of attribute socket_configurator.
-
#status ⇒ Object
readonly
Returns the value of attribute status.
-
#threaded ⇒ Object
readonly
Returns the value of attribute threaded.
- #transport ⇒ Bunny::Transport readonly
-
#user ⇒ Object
readonly
Returns the value of attribute user.
-
#vhost ⇒ Object
readonly
Returns the value of attribute vhost.
Class Method Summary collapse
-
.parse_uri(uri) ⇒ Hash
Parses an amqp URI into a hash that #initialize accepts.
Instance Method Summary collapse
- #addresses_from(options) ⇒ Object
-
#after_recovery_attempts_exhausted(&block) ⇒ Object
Defines a callable (e.g. a block) that will be called when the connection recovery failed after the specified numbers of recovery attempts.
-
#after_recovery_completed(&block) ⇒ Object
Defines a callable (e.g. a block) that will be called after successful connection recovery.
- #announce_network_failure_recovery ⇒ Object
-
#automatically_recover? ⇒ Boolean
True if this connection has automatic recovery from network failure enabled.
-
#before_recovery_attempt_starts(&block) ⇒ Object
Defines a callable (e.g. a block) that will be called before every connection recovery attempt.
-
#blocked? ⇒ Boolean
True if the connection is currently blocked by RabbitMQ because it’s running low on RAM, disk space, or other resource; false otherwise.
- #clean_up_and_fail_on_connection_close!(method) ⇒ Object
- #clean_up_on_shutdown ⇒ Object
-
#close(await_response = true) ⇒ Object
(also: #stop)
Closes the connection.
- #close_all_channels ⇒ Object
- #close_channel(ch) ⇒ Object
- #close_connection(await_response = true) ⇒ Object
- #close_transport ⇒ Object
-
#closed? ⇒ Boolean
True if this AMQP 0.9.1 connection is closed.
-
#closing? ⇒ Boolean
private
True if this AMQP 0.9.1 connection is closing.
-
#configure_socket(&block) ⇒ Object
Provides a way to fine tune the socket used by connection.
-
#connecting? ⇒ Boolean
True if this connection is still not fully open.
-
#create_channel(n = nil, consumer_pool_size = 1, consumer_pool_abort_on_exception = false, consumer_pool_shutdown_timeout = 60) ⇒ Bunny::Channel
(also: #channel)
Opens a new channel and returns it.
- #decrement_recovery_attemp_counter! ⇒ Object
-
#exchange_exists?(name) ⇒ Boolean
Checks if a exchange with given name exists.
- #find_channel(number) ⇒ Object
-
#handle_frame(ch_number, method) ⇒ Object
Handles incoming frames and dispatches them.
- #handle_frameset(ch_number, frames) ⇒ Object
- #handle_network_failure(exception) ⇒ Object
- #heartbeat_from(options) ⇒ Object
- #heartbeat_interval ⇒ Integer deprecated Deprecated.
-
#heartbeat_timeout ⇒ Integer
Heartbeat timeout used.
- #host ⇒ Object
- #host_from_address(address) ⇒ Object
- #host_with_port?(address) ⇒ Boolean
-
#hostname ⇒ String
RabbitMQ hostname (or IP address) used.
-
#initialize(connection_string_or_opts = , optz = Hash.new) ⇒ Session
constructor
A new instance of Session.
- #inspect ⇒ Object
- #instantiate_connection_level_exception(frame) ⇒ Object
-
#local_port ⇒ Integer
Client socket port.
-
#manually_closed? ⇒ Boolean
True if this AMQP 0.9.1 connection has been closed by the user (as opposed to the server).
- #maybe_shutdown_reader_loop ⇒ Object
- #next_channel_id ⇒ Object
- #notify_of_recovery_attempt_start ⇒ Object
- #notify_of_recovery_attempts_exhausted ⇒ Object
- #notify_of_recovery_completion ⇒ Object
-
#on_blocked {|AMQ::Protocol::Connection::Blocked| ... } ⇒ Object
Defines a callback that will be executed when RabbitMQ blocks the connection because it is running low on memory or disk space (as configured via config file and/or rabbitmqctl).
-
#on_unblocked(&block) ⇒ Object
Defines a callback that will be executed when RabbitMQ unblocks the connection that was previously blocked, e.g.
-
#open? ⇒ Boolean
(also: #connected?)
True if this AMQP 0.9.1 connection is open.
- #open_channel(ch) ⇒ Object
-
#password ⇒ String
Password used.
- #password_from(options) ⇒ Object
- #port ⇒ Object
- #port_from(options) ⇒ Object
- #port_from_address(address) ⇒ Object
-
#queue_exists?(name) ⇒ Boolean
Checks if a queue with given name exists.
- #raise_if_continuation_resulted_in_a_connection_error! ⇒ Object
- #reader_loop ⇒ Object
- #recover_channels ⇒ Object
- #recover_from_connection_close? ⇒ Boolean
- #recover_from_network_failure ⇒ Object
- #recoverable_network_failure?(exception) ⇒ Boolean
- #recovering_from_network_failure? ⇒ Boolean
- #recovery_attempts_limited? ⇒ Boolean
- #register_channel(ch) ⇒ Object
- #release_channel_id(i) ⇒ Object
- #reset_address_index ⇒ Object
- #reset_recovery_attempt_counter! ⇒ Object
-
#send_frame(frame, signal_activity = true) ⇒ Object
Sends frame to the peer, checking that connection is open.
-
#send_frame_without_timeout(frame, signal_activity = true) ⇒ Object
Sends frame to the peer, checking that connection is open.
-
#send_frameset(frames, channel) ⇒ Object
Sends multiple frames, in one go.
-
#send_frameset_without_timeout(frames, channel) ⇒ Object
Sends multiple frames, one by one.
- #send_raw_without_timeout(data, channel) ⇒ Object
- #should_retry_recovery? ⇒ Boolean
- #signal_activity! ⇒ Object
-
#start ⇒ Object
Starts the connection process.
- #start_reader_loop ⇒ Object
- #synchronised_find_channel(number) ⇒ Object
-
#threaded? ⇒ Boolean
True if this connection uses a separate thread for I/O activity.
- #to_s ⇒ String
-
#transport_write_timeout ⇒ Integer
Socket operation write timeout used by this connection.
- #unregister_channel(ch) ⇒ Object
- #update_secret(value, reason) ⇒ Object
-
#username ⇒ String
Username used.
- #username_from(options) ⇒ Object
-
#uses_ssl? ⇒ Boolean
(also: #ssl?)
True if this connection uses TLS (SSL).
-
#uses_tls? ⇒ Boolean
(also: #tls?)
True if this connection uses TLS (SSL).
- #validate_connection_options(options) ⇒ Object
- #vhost_from(options) ⇒ Object
-
#virtual_host ⇒ String
Virtual host used.
-
#with_channel(n = nil) ⇒ Bunny::Session
Creates a temporary channel, yields it to the block given to this method and closes it.
Constructor Details
#initialize(connection_string_or_opts = , optz = Hash.new) ⇒ Session
Returns a new instance of Session.
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 |
# File 'lib/bunny/session.rb', line 142 def initialize(connection_string_or_opts = ENV['RABBITMQ_URL'], optz = Hash.new) opts = case (connection_string_or_opts) when nil then Hash.new when String then self.class.parse_uri(connection_string_or_opts) when Hash then connection_string_or_opts end.merge(optz) @default_hosts_shuffle_strategy = Proc.new { |hosts| hosts.shuffle } @opts = opts log_file = opts[:log_file] || opts[:logfile] || STDOUT log_level = opts[:log_level] || ENV["BUNNY_LOG_LEVEL"] || Logger::WARN # we might need to log a warning about ill-formatted IPv6 address but # progname includes hostname, so init like this first @logger = opts.fetch(:logger, init_default_logger_without_progname(log_file, log_level)) @addresses = self.addresses_from(opts) @address_index = 0 @transport = nil @user = self.username_from(opts) @pass = self.password_from(opts) @vhost = self.vhost_from(opts) @threaded = opts.fetch(:threaded, true) # re-init, see above @logger = opts.fetch(:logger, init_default_logger(log_file, log_level)) (opts) @last_connection_error = nil # should automatic recovery from network failures be used? @automatically_recover = if opts[:automatically_recover].nil? && opts[:automatic_recovery].nil? true else opts[:automatically_recover] | opts[:automatic_recovery] end @recovering_from_network_failure = false @max_recovery_attempts = opts[:recovery_attempts] @recovery_attempts = @max_recovery_attempts # When this is set, connection attempts won't be reset after # successful reconnection. Some find this behavior more sensible # than the per-failure attempt counter. MK. @reset_recovery_attempt_counter_after_reconnection = opts.fetch(:reset_recovery_attempts_after_reconnection, true) @network_recovery_interval = opts.fetch(:network_recovery_interval, DEFAULT_NETWORK_RECOVERY_INTERVAL) @recover_from_connection_close = opts.fetch(:recover_from_connection_close, true) # in ms @continuation_timeout = opts.fetch(:continuation_timeout, DEFAULT_CONTINUATION_TIMEOUT) @status = :not_connected @manually_closed = false @blocked = false # these are negotiated with the broker during the connection tuning phase @client_frame_max = opts.fetch(:frame_max, DEFAULT_FRAME_MAX) @client_channel_max = normalize_client_channel_max(opts.fetch(:channel_max, DEFAULT_CHANNEL_MAX)) # will be-renegotiated during connection tuning steps. MK. @channel_max = @client_channel_max @heartbeat_sender = nil @client_heartbeat = self.heartbeat_from(opts) client_props = opts[:properties] || opts[:client_properties] || {} @connection_name = client_props[:connection_name] || opts[:connection_name] @client_properties = DEFAULT_CLIENT_PROPERTIES.merge(client_props) .merge(connection_name: connection_name) @mechanism = normalize_auth_mechanism(opts.fetch(:auth_mechanism, "PLAIN")) @credentials_encoder = credentials_encoder_for(@mechanism) @locale = @opts.fetch(:locale, DEFAULT_LOCALE) @mutex_impl = @opts.fetch(:mutex_impl, Monitor) # mutex for the channel id => channel hash @channel_mutex = @mutex_impl.new # transport operations/continuations mutex. A workaround for # the non-reentrant Ruby mutexes. MK. @transport_mutex = @mutex_impl.new @status_mutex = @mutex_impl.new @address_index_mutex = @mutex_impl.new @channels = Hash.new @recovery_attempt_started = opts[:recovery_attempt_started] @recovery_completed = opts[:recovery_completed] @recovery_attempts_exhausted = opts[:recovery_attempts_exhausted] @session_error_handler = opts.fetch(:session_error_handler, Thread.current) @recoverable_exceptions = DEFAULT_RECOVERABLE_EXCEPTIONS.dup self.reset_continuations self.initialize_transport end |
Instance Attribute Details
#channel_id_allocator ⇒ Object (readonly)
Returns the value of attribute channel_id_allocator.
84 85 86 |
# File 'lib/bunny/session.rb', line 84 def channel_id_allocator @channel_id_allocator end |
#channel_max ⇒ Object (readonly)
Returns the value of attribute channel_max.
82 83 84 |
# File 'lib/bunny/session.rb', line 82 def channel_max @channel_max end |
#connection_name ⇒ Object (readonly)
Returns the value of attribute connection_name.
93 94 95 |
# File 'lib/bunny/session.rb', line 93 def connection_name @connection_name end |
#continuation_timeout ⇒ Integer (readonly)
Returns Timeout for blocking protocol operations (queue.declare, queue.bind, etc), in milliseconds. Default is 15000.
91 92 93 |
# File 'lib/bunny/session.rb', line 91 def continuation_timeout @continuation_timeout end |
#frame_max ⇒ Object (readonly)
Returns the value of attribute frame_max.
82 83 84 |
# File 'lib/bunny/session.rb', line 82 def frame_max @frame_max end |
#heartbeat ⇒ Object (readonly)
Returns the value of attribute heartbeat.
82 83 84 |
# File 'lib/bunny/session.rb', line 82 def heartbeat @heartbeat end |
#logger ⇒ Logger (readonly)
89 90 91 |
# File 'lib/bunny/session.rb', line 89 def logger @logger end |
#mechanism ⇒ String (readonly)
Authentication mechanism, e.g. “PLAIN” or “EXTERNAL”
87 88 89 |
# File 'lib/bunny/session.rb', line 87 def mechanism @mechanism end |
#mutex_impl ⇒ Object (readonly)
296 297 298 |
# File 'lib/bunny/session.rb', line 296 def mutex_impl @mutex_impl end |
#network_recovery_interval ⇒ Object (readonly)
Returns the value of attribute network_recovery_interval.
92 93 94 |
# File 'lib/bunny/session.rb', line 92 def network_recovery_interval @network_recovery_interval end |
#pass ⇒ Object (readonly)
Returns the value of attribute pass.
82 83 84 |
# File 'lib/bunny/session.rb', line 82 def pass @pass end |
#recoverable_exceptions ⇒ Object
Returns the value of attribute recoverable_exceptions.
95 96 97 |
# File 'lib/bunny/session.rb', line 95 def recoverable_exceptions @recoverable_exceptions end |
#server_authentication_mechanisms ⇒ Object (readonly)
Returns the value of attribute server_authentication_mechanisms.
83 84 85 |
# File 'lib/bunny/session.rb', line 83 def server_authentication_mechanisms @server_authentication_mechanisms end |
#server_capabilities ⇒ Object (readonly)
Returns the value of attribute server_capabilities.
83 84 85 |
# File 'lib/bunny/session.rb', line 83 def server_capabilities @server_capabilities end |
#server_locales ⇒ Object (readonly)
Returns the value of attribute server_locales.
83 84 85 |
# File 'lib/bunny/session.rb', line 83 def server_locales @server_locales end |
#server_properties ⇒ Object (readonly)
Returns the value of attribute server_properties.
83 84 85 |
# File 'lib/bunny/session.rb', line 83 def server_properties @server_properties end |
#socket_configurator ⇒ Object
Returns the value of attribute socket_configurator.
94 95 96 |
# File 'lib/bunny/session.rb', line 94 def socket_configurator @socket_configurator end |
#status ⇒ Object (readonly)
Returns the value of attribute status.
82 83 84 |
# File 'lib/bunny/session.rb', line 82 def status @status end |
#threaded ⇒ Object (readonly)
Returns the value of attribute threaded.
82 83 84 |
# File 'lib/bunny/session.rb', line 82 def threaded @threaded end |
#transport ⇒ Bunny::Transport (readonly)
81 82 83 |
# File 'lib/bunny/session.rb', line 81 def transport @transport end |
#user ⇒ Object (readonly)
Returns the value of attribute user.
82 83 84 |
# File 'lib/bunny/session.rb', line 82 def user @user end |
#vhost ⇒ Object (readonly)
Returns the value of attribute vhost.
82 83 84 |
# File 'lib/bunny/session.rb', line 82 def vhost @vhost end |
Class Method Details
.parse_uri(uri) ⇒ Hash
Parses an amqp URI into a hash that #initialize accepts.
502 503 504 |
# File 'lib/bunny/session.rb', line 502 def self.parse_uri(uri) AMQ::Settings.configure(uri) end |
Instance Method Details
#addresses_from(options) ⇒ Object
932 933 934 935 936 937 938 939 940 941 942 943 944 |
# File 'lib/bunny/session.rb', line 932 def addresses_from() shuffle_strategy = .fetch(:hosts_shuffle_strategy, @default_hosts_shuffle_strategy) addresses = [:host] || [:hostname] || [:addresses] || [:hosts] || ["#{DEFAULT_HOST}:#{port_from()}"] addresses = [addresses] unless addresses.is_a? Array addrs = addresses.map do |address| host_with_port?(address) ? address : "#{address}:#{port_from(@opts)}" end shuffle_strategy.call(addrs) end |
#after_recovery_attempts_exhausted(&block) ⇒ Object
Defines a callable (e.g. a block) that will be called when the connection recovery failed after the specified numbers of recovery attempts.
561 562 563 |
# File 'lib/bunny/session.rb', line 561 def after_recovery_attempts_exhausted(&block) @recovery_attempts_exhausted = block end |
#after_recovery_completed(&block) ⇒ Object
Defines a callable (e.g. a block) that will be called after successful connection recovery.
554 555 556 |
# File 'lib/bunny/session.rb', line 554 def after_recovery_completed(&block) @recovery_completed = block end |
#announce_network_failure_recovery ⇒ Object
771 772 773 774 775 776 777 |
# File 'lib/bunny/session.rb', line 771 def announce_network_failure_recovery if recovery_attempts_limited? @logger.warn "Will recover from a network failure (#{@recovery_attempts} out of #{@max_recovery_attempts} left)..." else @logger.warn "Will recover from a network failure (no retry limit)..." end end |
#automatically_recover? ⇒ Boolean
Returns true if this connection has automatic recovery from network failure enabled.
466 467 468 |
# File 'lib/bunny/session.rb', line 466 def automatically_recover? @automatically_recover end |
#before_recovery_attempt_starts(&block) ⇒ Object
Defines a callable (e.g. a block) that will be called before every connection recovery attempt.
548 549 550 |
# File 'lib/bunny/session.rb', line 548 def before_recovery_attempt_starts(&block) @recovery_attempt_started = block end |
#blocked? ⇒ Boolean
Returns true if the connection is currently blocked by RabbitMQ because it’s running low on RAM, disk space, or other resource; false otherwise.
494 495 496 |
# File 'lib/bunny/session.rb', line 494 def blocked? @blocked end |
#clean_up_and_fail_on_connection_close!(method) ⇒ Object
905 906 907 908 909 910 911 912 913 914 915 |
# File 'lib/bunny/session.rb', line 905 def clean_up_and_fail_on_connection_close!(method) @last_connection_error = instantiate_connection_level_exception(method) @continuations.push(method) clean_up_on_shutdown if threaded? @session_error_handler.raise(@last_connection_error) else raise @last_connection_error end end |
#clean_up_on_shutdown ⇒ Object
917 918 919 920 921 922 923 924 925 926 927 928 929 |
# File 'lib/bunny/session.rb', line 917 def clean_up_on_shutdown begin shut_down_all_consumer_work_pools! maybe_shutdown_reader_loop maybe_shutdown_heartbeat_sender rescue ShutdownSignal => _sse # no-op rescue Exception => e @logger.warn "Caught an exception when cleaning up after receiving connection.close: #{e.}" ensure close_transport end end |
#close(await_response = true) ⇒ Object Also known as: stop
Closes the connection. This involves closing all of its channels.
398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 |
# File 'lib/bunny/session.rb', line 398 def close(await_response = true) @status_mutex.synchronize { @status = :closing } ignoring_io_errors do if @transport.open? @logger.debug "Transport is still open..." close_all_channels @logger.debug "Will close all channels...." self.close_connection(await_response) end clean_up_on_shutdown end @status_mutex.synchronize do @status = :closed @manually_closed = true end @logger.debug "Connection is closed" true end |
#close_all_channels ⇒ Object
611 612 613 614 615 616 617 |
# File 'lib/bunny/session.rb', line 611 def close_all_channels @channel_mutex.synchronize do @channels.reject {|n, ch| n == 0 || !ch.open? }.each do |_, ch| Bunny::Timeout.timeout(@transport.disconnect_timeout, ClientTimeout) { ch.close } end end end |
#close_channel(ch) ⇒ Object
586 587 588 589 590 591 592 593 594 595 596 597 598 |
# File 'lib/bunny/session.rb', line 586 def close_channel(ch) @channel_mutex.synchronize do n = ch.number @transport.send_frame(AMQ::Protocol::Channel::Close.encode(n, 200, "Goodbye", 0, 0)) @last_channel_close_ok = wait_on_continuations raise_if_continuation_resulted_in_a_connection_error! self.unregister_channel(ch) self.release_channel_id(ch.id) @last_channel_close_ok end end |
#close_connection(await_response = true) ⇒ Object
620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 |
# File 'lib/bunny/session.rb', line 620 def close_connection(await_response = true) if @transport.open? @logger.debug "Transport is still open" @transport.send_frame(AMQ::Protocol::Connection::Close.encode(200, "Goodbye", 0, 0)) if await_response @logger.debug "Waiting for a connection.close-ok..." @last_connection_close_ok = wait_on_continuations end end shut_down_all_consumer_work_pools! maybe_shutdown_heartbeat_sender @status_mutex.synchronize { @status = :not_connected } end |
#close_transport ⇒ Object
1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 |
# File 'lib/bunny/session.rb', line 1098 def close_transport begin @transport.close rescue StandardError => e @logger.error "Exception when closing transport:" @logger.error e.class.name @logger.error e. @logger.error e.backtrace end end |
#closed? ⇒ Boolean
Returns true if this AMQP 0.9.1 connection is closed.
448 449 450 |
# File 'lib/bunny/session.rb', line 448 def closed? @status_mutex.synchronize { @status == :closed } end |
#closing? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns true if this AMQP 0.9.1 connection is closing.
443 444 445 |
# File 'lib/bunny/session.rb', line 443 def closing? @status_mutex.synchronize { @status == :closing } end |
#configure_socket(&block) ⇒ Object
Provides a way to fine tune the socket used by connection. Accepts a block that the socket will be yielded to.
300 301 302 303 304 |
# File 'lib/bunny/session.rb', line 300 def configure_socket(&block) raise ArgumentError, "No block provided!" if block.nil? @transport.configure_socket(&block) end |
#connecting? ⇒ Boolean
Returns true if this connection is still not fully open.
437 438 439 |
# File 'lib/bunny/session.rb', line 437 def connecting? status == :connecting end |
#create_channel(n = nil, consumer_pool_size = 1, consumer_pool_abort_on_exception = false, consumer_pool_shutdown_timeout = 60) ⇒ Bunny::Channel Also known as: channel
Opens a new channel and returns it. This method will block the calling thread until the response is received and the channel is guaranteed to be opened (this operation is very fast and inexpensive).
380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 |
# File 'lib/bunny/session.rb', line 380 def create_channel(n = nil, consumer_pool_size = 1, consumer_pool_abort_on_exception = false, consumer_pool_shutdown_timeout = 60) raise ArgumentError, "channel number 0 is reserved in the protocol and cannot be used" if 0 == n raise ConnectionAlreadyClosed if manually_closed? raise RuntimeError, "this connection is not open. Was Bunny::Session#start invoked? Is automatic recovery enabled?" if !connected? @channel_mutex.synchronize do if n && (ch = @channels[n]) ch else ch = Bunny::Channel.new(self, n, ConsumerWorkPool.new(consumer_pool_size || 1, consumer_pool_abort_on_exception, consumer_pool_shutdown_timeout)) ch.open ch end end end |
#decrement_recovery_attemp_counter! ⇒ Object
838 839 840 841 842 843 844 |
# File 'lib/bunny/session.rb', line 838 def decrement_recovery_attemp_counter! if @recovery_attempts @recovery_attempts -= 1 @logger.debug "#{@recovery_attempts} recovery attempts left" end @recovery_attempts end |
#exchange_exists?(name) ⇒ Boolean
Checks if a exchange with given name exists.
Implemented using exchange.declare with passive set to true and a one-off (short lived) channel under the hood.
534 535 536 537 538 539 540 541 542 543 544 |
# File 'lib/bunny/session.rb', line 534 def exchange_exists?(name) ch = create_channel begin ch.exchange(name, :passive => true) true rescue Bunny::NotFound => _ false ensure ch.close if ch.open? end end |
#find_channel(number) ⇒ Object
601 602 603 |
# File 'lib/bunny/session.rb', line 601 def find_channel(number) @channels[number] end |
#handle_frame(ch_number, method) ⇒ Object
Handles incoming frames and dispatches them.
Channel methods (‘channel.open-ok`, `channel.close-ok`) are handled by the session itself. Connection level errors result in exceptions being raised. Deliveries and other methods are passed on to channels to dispatch.
644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 |
# File 'lib/bunny/session.rb', line 644 def handle_frame(ch_number, method) @logger.debug { "Session#handle_frame on #{ch_number}: #{method.inspect}" } case method when AMQ::Protocol::Channel::OpenOk then @continuations.push(method) when AMQ::Protocol::Channel::CloseOk then @continuations.push(method) when AMQ::Protocol::Connection::Close then if recover_from_connection_close? @logger.warn "Recovering from connection.close (#{method.reply_text})" clean_up_on_shutdown handle_network_failure(instantiate_connection_level_exception(method)) else clean_up_and_fail_on_connection_close!(method) end when AMQ::Protocol::Connection::CloseOk then @last_connection_close_ok = method begin @continuations.clear rescue StandardError => e @logger.error e.class.name @logger.error e. @logger.error e.backtrace ensure @continuations.push(:__unblock__) end when AMQ::Protocol::Connection::Blocked then @blocked = true @block_callback.call(method) if @block_callback when AMQ::Protocol::Connection::Unblocked then @blocked = false @unblock_callback.call(method) if @unblock_callback when AMQ::Protocol::Connection::UpdateSecretOk then @continuations.push(method) when AMQ::Protocol::Channel::Close then begin ch = synchronised_find_channel(ch_number) # this includes sending a channel.close-ok and # potentially invoking a user-provided callback, # avoid doing that while holding a mutex lock. MK. ch.handle_method(method) ensure if ch.nil? @logger.warn "Received a server-sent channel.close but the channel was not found locally. Ignoring the frame." else # synchronises on @channel_mutex under the hood self.unregister_channel(ch) end end when AMQ::Protocol::Basic::GetEmpty then ch = find_channel(ch_number) ch.handle_basic_get_empty(method) else if ch = find_channel(ch_number) ch.handle_method(method) else @logger.warn "Channel #{ch_number} is not open on this connection!" end end end |
#handle_frameset(ch_number, frames) ⇒ Object
711 712 713 714 715 716 717 718 719 720 721 722 723 724 |
# File 'lib/bunny/session.rb', line 711 def handle_frameset(ch_number, frames) method = frames.first case method when AMQ::Protocol::Basic::GetOk then @channels[ch_number].handle_basic_get_ok(*frames) when AMQ::Protocol::Basic::GetEmpty then @channels[ch_number].handle_basic_get_empty(*frames) when AMQ::Protocol::Basic::Return then @channels[ch_number].handle_basic_return(*frames) else @channels[ch_number].handle_frameset(*frames) end end |
#handle_network_failure(exception) ⇒ Object
732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 |
# File 'lib/bunny/session.rb', line 732 def handle_network_failure(exception) raise NetworkErrorWrapper.new(exception) unless @threaded @status_mutex.synchronize { @status = :disconnected } if !recovering_from_network_failure? begin @recovering_from_network_failure = true if recoverable_network_failure?(exception) announce_network_failure_recovery @channel_mutex.synchronize do @channels.each do |n, ch| ch.maybe_kill_consumer_work_pool! end end @reader_loop.stop if @reader_loop maybe_shutdown_heartbeat_sender recover_from_network_failure else @logger.error "Exception #{exception.} is considered unrecoverable..." end ensure @recovering_from_network_failure = false end end end |
#heartbeat_from(options) ⇒ Object
1030 1031 1032 |
# File 'lib/bunny/session.rb', line 1030 def heartbeat_from() [:heartbeat] || [:heartbeat_timeout] || [:requested_heartbeat] || [:heartbeat_interval] || DEFAULT_HEARTBEAT end |
#heartbeat_interval ⇒ Integer
Returns Heartbeat timeout (not interval) used.
261 |
# File 'lib/bunny/session.rb', line 261 def heartbeat_interval; self.heartbeat; end |
#heartbeat_timeout ⇒ Integer
Returns Heartbeat timeout used.
264 |
# File 'lib/bunny/session.rb', line 264 def heartbeat_timeout; self.heartbeat; end |
#host ⇒ Object
283 284 285 |
# File 'lib/bunny/session.rb', line 283 def host @transport ? @transport.host : host_from_address(@addresses[@address_index]) end |
#host_from_address(address) ⇒ Object
971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 |
# File 'lib/bunny/session.rb', line 971 def host_from_address(address) # we need to handle cases such as [2001:db8:85a3:8d3:1319:8a2e:370:7348]:5671 last_colon = address.rindex(":") last_closing_square_bracket = address.rindex("]") if last_closing_square_bracket.nil? parts = address.split(":") # this looks like an unquoted IPv6 address, so emit a warning if parts.size > 2 @logger.warn "Address #{address} looks like an unquoted IPv6 address. Make sure you quote IPv6 addresses like so: [2001:db8:85a3:8d3:1319:8a2e:370:7348]" end return parts[0] end if last_closing_square_bracket < last_colon # there is a port address[0, last_colon] elsif last_closing_square_bracket > last_colon address end end |
#host_with_port?(address) ⇒ Boolean
958 959 960 961 962 963 964 965 966 967 968 |
# File 'lib/bunny/session.rb', line 958 def host_with_port?(address) # we need to handle cases such as [2001:db8:85a3:8d3:1319:8a2e:370:7348]:5671 last_colon = address.rindex(":") last_closing_square_bracket = address.rindex("]") if last_closing_square_bracket.nil? address.include?(":") else last_closing_square_bracket < last_colon end end |
#hostname ⇒ String
Returns RabbitMQ hostname (or IP address) used.
251 |
# File 'lib/bunny/session.rb', line 251 def hostname; self.host; end |
#inspect ⇒ Object
1212 1213 1214 |
# File 'lib/bunny/session.rb', line 1212 def inspect to_s end |
#instantiate_connection_level_exception(frame) ⇒ Object
877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 |
# File 'lib/bunny/session.rb', line 877 def instantiate_connection_level_exception(frame) case frame when AMQ::Protocol::Connection::Close then klass = case frame.reply_code when 320 then ConnectionForced when 501 then FrameError when 503 then CommandInvalid when 504 then ChannelError when 505 then UnexpectedFrame when 506 then ResourceError when 530 then NotAllowedError when 541 then InternalError else raise "Unknown reply code: #{frame.reply_code}, text: #{frame.reply_text}" end klass.new("Connection-level error: #{frame.reply_text}", self, frame) end end |
#local_port ⇒ Integer
Returns Client socket port.
307 308 309 |
# File 'lib/bunny/session.rb', line 307 def local_port @transport.local_address.ip_port end |
#manually_closed? ⇒ Boolean
Returns true if this AMQP 0.9.1 connection has been closed by the user (as opposed to the server).
453 454 455 |
# File 'lib/bunny/session.rb', line 453 def manually_closed? @status_mutex.synchronize { @manually_closed == true } end |
#maybe_shutdown_reader_loop ⇒ Object
1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 |
# File 'lib/bunny/session.rb', line 1072 def maybe_shutdown_reader_loop if @reader_loop @reader_loop.stop if threaded? # this is the easiest way to wait until the loop # is guaranteed to have terminated @reader_loop.terminate_with(ShutdownSignal) # joining the thread here may take forever # on JRuby because sun.nio.ch.KQueueArrayWrapper#kevent0 is # a native method that cannot be (easily) interrupted. # So we use this ugly hack or else our test suite takes forever # to run on JRuby (a new connection is opened/closed per example). MK. if defined?(JRUBY_VERSION) sleep 0.075 else @reader_loop.join end else # single threaded mode, nothing to do. MK. end end @reader_loop = nil end |
#next_channel_id ⇒ Object
1035 1036 1037 |
# File 'lib/bunny/session.rb', line 1035 def next_channel_id @channel_id_allocator.next_channel_id end |
#notify_of_recovery_attempt_start ⇒ Object
862 863 864 |
# File 'lib/bunny/session.rb', line 862 def notify_of_recovery_attempt_start @recovery_attempt_started.call if @recovery_attempt_started end |
#notify_of_recovery_attempts_exhausted ⇒ Object
872 873 874 |
# File 'lib/bunny/session.rb', line 872 def notify_of_recovery_attempts_exhausted @recovery_attempts_exhausted.call if @recovery_attempts_exhausted end |
#notify_of_recovery_completion ⇒ Object
867 868 869 |
# File 'lib/bunny/session.rb', line 867 def notify_of_recovery_completion @recovery_completed.call if @recovery_completed end |
#on_blocked {|AMQ::Protocol::Connection::Blocked| ... } ⇒ Object
Defines a callback that will be executed when RabbitMQ blocks the connection because it is running low on memory or disk space (as configured via config file and/or rabbitmqctl).
477 478 479 |
# File 'lib/bunny/session.rb', line 477 def on_blocked(&block) @block_callback = block end |
#on_unblocked(&block) ⇒ Object
Defines a callback that will be executed when RabbitMQ unblocks the connection that was previously blocked, e.g. because the memory or disk space alarm has cleared.
486 487 488 |
# File 'lib/bunny/session.rb', line 486 def on_unblocked(&block) @unblock_callback = block end |
#open? ⇒ Boolean Also known as: connected?
Returns true if this AMQP 0.9.1 connection is open.
458 459 460 461 462 |
# File 'lib/bunny/session.rb', line 458 def open? @status_mutex.synchronize do (status == :open || status == :connected || status == :connecting) && @transport.open? end end |
#open_channel(ch) ⇒ Object
570 571 572 573 574 575 576 577 578 579 580 581 582 583 |
# File 'lib/bunny/session.rb', line 570 def open_channel(ch) @channel_mutex.synchronize do n = ch.number self.register_channel(ch) @transport_mutex.synchronize do @transport.send_frame(AMQ::Protocol::Channel::Open.encode(n, AMQ::Protocol::EMPTY_STRING)) end @last_channel_open_ok = wait_on_continuations raise_if_continuation_resulted_in_a_connection_error! @last_channel_open_ok end end |
#password ⇒ String
Returns Password used.
255 |
# File 'lib/bunny/session.rb', line 255 def password; self.pass; end |
#password_from(options) ⇒ Object
1025 1026 1027 |
# File 'lib/bunny/session.rb', line 1025 def password_from() [:password] || [:pass] || [:pwd] || DEFAULT_PASSWORD end |
#port ⇒ Object
287 288 289 |
# File 'lib/bunny/session.rb', line 287 def port @transport ? @transport.port : port_from_address(@addresses[@address_index]) end |
#port_from(options) ⇒ Object
947 948 949 950 951 952 953 954 955 |
# File 'lib/bunny/session.rb', line 947 def port_from() fallback = if [:tls] || [:ssl] AMQ::Protocol::TLS_PORT else AMQ::Protocol::DEFAULT_PORT end .fetch(:port, fallback) end |
#port_from_address(address) ⇒ Object
994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 |
# File 'lib/bunny/session.rb', line 994 def port_from_address(address) # we need to handle cases such as [2001:db8:85a3:8d3:1319:8a2e:370:7348]:5671 last_colon = address.rindex(":") last_closing_square_bracket = address.rindex("]") if last_closing_square_bracket.nil? parts = address.split(":") # this looks like an unquoted IPv6 address, so emit a warning if parts.size > 2 @logger.warn "Address #{address} looks like an unquoted IPv6 address. Make sure you quote IPv6 addresses like so: [2001:db8:85a3:8d3:1319:8a2e:370:7348]" end return parts[1].to_i end if last_closing_square_bracket < last_colon # there is a port address[(last_colon + 1)..-1].to_i end end |
#queue_exists?(name) ⇒ Boolean
Checks if a queue with given name exists.
Implemented using queue.declare with passive set to true and a one-off (short lived) channel under the hood.
514 515 516 517 518 519 520 521 522 523 524 |
# File 'lib/bunny/session.rb', line 514 def queue_exists?(name) ch = create_channel begin ch.queue(name, :passive => true) true rescue Bunny::NotFound => _ false ensure ch.close if ch.open? end end |
#raise_if_continuation_resulted_in_a_connection_error! ⇒ Object
706 707 708 |
# File 'lib/bunny/session.rb', line 706 def raise_if_continuation_resulted_in_a_connection_error! raise @last_connection_error if @last_connection_error end |
#reader_loop ⇒ Object
1067 1068 1069 |
# File 'lib/bunny/session.rb', line 1067 def reader_loop @reader_loop ||= ReaderLoop.new(@transport, self, @session_error_handler) end |
#recover_channels ⇒ Object
852 853 854 855 856 857 858 859 |
# File 'lib/bunny/session.rb', line 852 def recover_channels @channel_mutex.synchronize do @channels.each do |n, ch| ch.open ch.recover_from_network_failure end end end |
#recover_from_connection_close? ⇒ Boolean
727 728 729 |
# File 'lib/bunny/session.rb', line 727 def recover_from_connection_close? @recover_from_connection_close end |
#recover_from_network_failure ⇒ Object
780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 |
# File 'lib/bunny/session.rb', line 780 def recover_from_network_failure sleep @network_recovery_interval @logger.debug "Will attempt connection recovery..." notify_of_recovery_attempt_start self.initialize_transport @logger.warn "Retrying connection on next host in line: #{@transport.host}:#{@transport.port}" self.start if open? @recovering_from_network_failure = false @logger.debug "Connection is now open" if @reset_recovery_attempt_counter_after_reconnection @logger.debug "Resetting recovery attempt counter after successful reconnection" reset_recovery_attempt_counter! else @logger.debug "Not resetting recovery attempt counter after successful reconnection, as configured" end recover_channels notify_of_recovery_completion end rescue HostListDepleted reset_address_index retry rescue => e if recoverable_network_failure?(e) @logger.warn "TCP connection failed" if should_retry_recovery? @logger.warn "Reconnecting in #{@network_recovery_interval} seconds" decrement_recovery_attemp_counter! announce_network_failure_recovery retry else @logger.error "Ran out of recovery attempts (limit set to #{@max_recovery_attempts}), giving up" @transport.close self.close(false) @manually_closed = false notify_of_recovery_attempts_exhausted end else raise e end end |
#recoverable_network_failure?(exception) ⇒ Boolean
761 762 763 |
# File 'lib/bunny/session.rb', line 761 def recoverable_network_failure?(exception) @recoverable_exceptions.any? {|x| exception.kind_of? x} end |
#recovering_from_network_failure? ⇒ Boolean
766 767 768 |
# File 'lib/bunny/session.rb', line 766 def recovering_from_network_failure? @recovering_from_network_failure end |
#recovery_attempts_limited? ⇒ Boolean
828 829 830 |
# File 'lib/bunny/session.rb', line 828 def recovery_attempts_limited? !!@max_recovery_attempts end |
#register_channel(ch) ⇒ Object
1045 1046 1047 1048 1049 |
# File 'lib/bunny/session.rb', line 1045 def register_channel(ch) @channel_mutex.synchronize do @channels[ch.number] = ch end end |
#release_channel_id(i) ⇒ Object
1040 1041 1042 |
# File 'lib/bunny/session.rb', line 1040 def release_channel_id(i) @channel_id_allocator.release_channel_id(i) end |
#reset_address_index ⇒ Object
291 292 293 |
# File 'lib/bunny/session.rb', line 291 def reset_address_index @address_index_mutex.synchronize { @address_index = 0 } end |
#reset_recovery_attempt_counter! ⇒ Object
847 848 849 |
# File 'lib/bunny/session.rb', line 847 def reset_recovery_attempt_counter! @recovery_attempts = @max_recovery_attempts end |
#send_frame(frame, signal_activity = true) ⇒ Object
Sends frame to the peer, checking that connection is open. Exposed primarily for Bunny::Channel
1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 |
# File 'lib/bunny/session.rb', line 1120 def send_frame(frame, signal_activity = true) if open? # @transport_mutex.synchronize do # @transport.write(frame.encode) # end @transport.write(frame.encode) signal_activity! if signal_activity else raise ConnectionClosedError.new(frame) end end |
#send_frame_without_timeout(frame, signal_activity = true) ⇒ Object
Sends frame to the peer, checking that connection is open. Uses transport implementation that does not perform timeout control. Exposed primarily for Bunny::Channel.
1138 1139 1140 1141 1142 1143 1144 1145 |
# File 'lib/bunny/session.rb', line 1138 def send_frame_without_timeout(frame, signal_activity = true) if open? @transport.write_without_timeout(frame.encode) signal_activity! if signal_activity else raise ConnectionClosedError.new(frame) end end |
#send_frameset(frames, channel) ⇒ Object
Sends multiple frames, in one go. For thread safety this method takes a channel object and synchronizes on it.
1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 |
# File 'lib/bunny/session.rb', line 1151 def send_frameset(frames, channel) # some developers end up sharing channels between threads and when multiple # threads publish on the same channel aggressively, at some point frames will be # delivered out of order and broker will raise 505 UNEXPECTED_FRAME exception. # If we synchronize on the channel, however, this is both thread safe and pretty fine-grained # locking. Note that "single frame" methods technically do not need this kind of synchronization # (no incorrect frame interleaving of the same kind as with basic.publish isn't possible) but we # still recommend not sharing channels between threads except for consumer-only cases in the docs. MK. channel.synchronize do # see rabbitmq/rabbitmq-server#156 if open? data = frames.reduce("") { |acc, frame| acc << frame.encode } @transport.write(data) signal_activity! else raise ConnectionClosedError.new(frames) end end end |
#send_frameset_without_timeout(frames, channel) ⇒ Object
Sends multiple frames, one by one. For thread safety this method takes a channel object and synchronizes on it. Uses transport implementation that does not perform timeout control.
1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 |
# File 'lib/bunny/session.rb', line 1176 def send_frameset_without_timeout(frames, channel) # some developers end up sharing channels between threads and when multiple # threads publish on the same channel aggressively, at some point frames will be # delivered out of order and broker will raise 505 UNEXPECTED_FRAME exception. # If we synchronize on the channel, however, this is both thread safe and pretty fine-grained # locking. See a note about "single frame" methods in a comment in `send_frameset`. MK. channel.synchronize do if open? frames.each { |frame| self.send_frame_without_timeout(frame, false) } signal_activity! else raise ConnectionClosedError.new(frames) end end end |
#send_raw_without_timeout(data, channel) ⇒ Object
1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 |
# File 'lib/bunny/session.rb', line 1193 def send_raw_without_timeout(data, channel) # some developers end up sharing channels between threads and when multiple # threads publish on the same channel aggressively, at some point frames will be # delivered out of order and broker will raise 505 UNEXPECTED_FRAME exception. # If we synchronize on the channel, however, this is both thread safe and pretty fine-grained # locking. Note that "single frame" methods do not need this kind of synchronization. MK. channel.synchronize do @transport.write(data) signal_activity! end end |
#should_retry_recovery? ⇒ Boolean
833 834 835 |
# File 'lib/bunny/session.rb', line 833 def should_retry_recovery? !recovery_attempts_limited? || @recovery_attempts > 1 end |
#signal_activity! ⇒ Object
1110 1111 1112 |
# File 'lib/bunny/session.rb', line 1110 def signal_activity! @heartbeat_sender.signal_activity! if @heartbeat_sender end |
#start ⇒ Object
Starts the connection process.
316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 |
# File 'lib/bunny/session.rb', line 316 def start return self if connected? @status_mutex.synchronize { @status = :connecting } # reset here for cases when automatic network recovery kicks in # when we were blocked. MK. @blocked = false self.reset_continuations begin begin # close existing transport if we have one, # to not leak sockets @transport.maybe_initialize_socket @transport.post_initialize_socket @transport.connect self.init_connection self.open_connection @reader_loop = nil self.start_reader_loop if threaded? rescue TCPConnectionFailed => e @logger.warn e. self.initialize_transport @logger.warn "Will try to connect to the next endpoint in line: #{@transport.host}:#{@transport.port}" return self.start rescue @status_mutex.synchronize { @status = :not_connected } raise end rescue HostListDepleted self.reset_address_index @status_mutex.synchronize { @status = :not_connected } raise TCPConnectionFailedForAllHosts end @status_mutex.synchronize { @manually_closed = false } self end |
#start_reader_loop ⇒ Object
1062 1063 1064 |
# File 'lib/bunny/session.rb', line 1062 def start_reader_loop reader_loop.start end |
#synchronised_find_channel(number) ⇒ Object
606 607 608 |
# File 'lib/bunny/session.rb', line 606 def synchronised_find_channel(number) @channel_mutex.synchronize { @channels[number] } end |
#threaded? ⇒ Boolean
Returns true if this connection uses a separate thread for I/O activity.
279 280 281 |
# File 'lib/bunny/session.rb', line 279 def threaded? @threaded end |
#to_s ⇒ String
1207 1208 1209 1210 |
# File 'lib/bunny/session.rb', line 1207 def to_s oid = ("0x%x" % (self.object_id << 1)) "#<#{self.class.name}:#{oid} #{@user}@#{host}:#{port}, vhost=#{@vhost}, addresses=[#{@addresses.join(',')}]>" end |
#transport_write_timeout ⇒ Integer
Socket operation write timeout used by this connection
371 372 373 |
# File 'lib/bunny/session.rb', line 371 def transport_write_timeout @transport.write_timeout end |
#unregister_channel(ch) ⇒ Object
1052 1053 1054 1055 1056 1057 1058 1059 |
# File 'lib/bunny/session.rb', line 1052 def unregister_channel(ch) @channel_mutex.synchronize do n = ch.number self.release_channel_id(n) @channels.delete(ch.number) end end |
#update_secret(value, reason) ⇒ Object
360 361 362 363 364 365 366 |
# File 'lib/bunny/session.rb', line 360 def update_secret(value, reason) @transport.send_frame(AMQ::Protocol::Connection::UpdateSecret.encode(value, reason)) @last_update_secret_ok = wait_on_continuations raise_if_continuation_resulted_in_a_connection_error! @last_update_secret_ok end |
#username ⇒ String
Returns Username used.
253 |
# File 'lib/bunny/session.rb', line 253 def username; self.user; end |
#username_from(options) ⇒ Object
1020 1021 1022 |
# File 'lib/bunny/session.rb', line 1020 def username_from() [:username] || [:user] || DEFAULT_USER end |
#uses_ssl? ⇒ Boolean Also known as: ssl?
Returns true if this connection uses TLS (SSL).
273 274 275 |
# File 'lib/bunny/session.rb', line 273 def uses_ssl? @transport.uses_ssl? end |
#uses_tls? ⇒ Boolean Also known as: tls?
Returns true if this connection uses TLS (SSL).
267 268 269 |
# File 'lib/bunny/session.rb', line 267 def uses_tls? @transport.uses_tls? end |
#validate_connection_options(options) ⇒ Object
240 241 242 243 244 245 246 247 248 |
# File 'lib/bunny/session.rb', line 240 def () if [:hosts] && [:addresses] raise ArgumentError, "Connection options can't contain hosts and addresses at the same time" end if ([:host] || [:hostname]) && ([:hosts] || [:addresses]) @logger.warn "Connection options contain both a host and an array of hosts (addresses), please pick one." end end |
#vhost_from(options) ⇒ Object
1015 1016 1017 |
# File 'lib/bunny/session.rb', line 1015 def vhost_from() [:virtual_host] || [:vhost] || DEFAULT_VHOST end |
#virtual_host ⇒ String
Returns Virtual host used.
257 |
# File 'lib/bunny/session.rb', line 257 def virtual_host; self.vhost; end |
#with_channel(n = nil) ⇒ Bunny::Session
Creates a temporary channel, yields it to the block given to this method and closes it.
425 426 427 428 429 430 431 432 433 434 |
# File 'lib/bunny/session.rb', line 425 def with_channel(n = nil) ch = create_channel(n) begin yield ch ensure ch.close if ch.open? end self end |