Class: Bunny::Session

Inherits:
Object
  • Object
show all
Defined in:
lib/bunny/session.rb

Overview

Represents AMQP 0.9.1 connection (to a RabbitMQ node).

Constant Summary collapse

DEFAULT_HOST =

Default host used for connection

"127.0.0.1"
DEFAULT_VHOST =

Default virtual host used for connection

"/"
DEFAULT_USER =

Default username used for connection

"guest"
DEFAULT_PASSWORD =

Default password used for connection

"guest"
DEFAULT_HEARTBEAT =

Default heartbeat interval, the same value as RabbitMQ 3.0 uses.

:server
DEFAULT_FRAME_MAX =
131072
CHANNEL_MAX_LIMIT =

Hard limit the user cannot go over regardless of server configuration.

65535
DEFAULT_CHANNEL_MAX =
2047
CONNECT_TIMEOUT =

backwards compatibility

Transport::DEFAULT_CONNECTION_TIMEOUT
DEFAULT_CONTINUATION_TIMEOUT =
15000
DEFAULT_CLIENT_PROPERTIES =

RabbitMQ client metadata

{
  :capabilities => {
    :publisher_confirms           => true,
    :consumer_cancel_notify       => true,
    :exchange_exchange_bindings   => true,
    :"basic.nack"                 => true,
    :"connection.blocked"         => true,
    # See http://www.rabbitmq.com/auth-notification.html
    :authentication_failure_close => true
  },
  :product      => "Bunny",
  :platform     => ::RUBY_DESCRIPTION,
  :version      => Bunny::VERSION,
  :information  => "http://rubybunny.info",
}
DEFAULT_LOCALE =
"en_GB"
DEFAULT_NETWORK_RECOVERY_INTERVAL =

Default reconnection interval for TCP connection failures

5.0
DEFAULT_RECOVERABLE_EXCEPTIONS =
[StandardError, TCPConnectionFailedForAllHosts, TCPConnectionFailed, AMQ::Protocol::EmptyResponseError, SystemCallError, Timeout::Error, Bunny::ConnectionLevelException, Bunny::ConnectionClosedError]

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection_string_or_opts = , optz = Hash.new) ⇒ Session

Returns a new instance of Session.

Parameters:

  • connection_string_or_opts (String, Hash) (defaults to: )

    Connection string or a hash of connection options

  • optz (Hash) (defaults to: Hash.new)

    Extra options not related to connection

Options Hash (connection_string_or_opts):

  • :host (String) — default: "127.0.0.1"

    Hostname or IP address to connect to

  • :hosts (Array<String>) — default: ["127.0.0.1"]

    list of hostname or IP addresses to select hostname from when connecting

  • :addresses (Array<String>) — default: ["127.0.0.1:5672"]

    list of addresses to select hostname and port from when connecting

  • :port (Integer) — default: 5672

    Port RabbitMQ listens on

  • :username (String) — default: "guest"

    Username

  • :password (String) — default: "guest"

    Password

  • :vhost (String) — default: "/"

    Virtual host to use

  • :heartbeat (Integer, Symbol) — default: :server

    Heartbeat timeout to offer to the server. :server means use the value suggested by RabbitMQ. 0 means heartbeats and socket read timeouts will be disabled (not recommended).

  • :network_recovery_interval (Integer) — default: 4

    Recovery interval periodic network recovery will use. This includes initial pause after network failure.

  • :tls (Boolean) — default: false

    Should TLS/SSL be used?

  • :tls_cert (String) — default: nil

    Path to client TLS/SSL certificate file (.pem)

  • :tls_key (String) — default: nil

    Path to client TLS/SSL private key file (.pem)

  • :tls_ca_certificates (Array<String>)

    Array of paths to TLS/SSL CA files (.pem), by default detected from OpenSSL configuration

  • :verify_peer (String) — default: true

    Whether TLS peer verification should be performed

  • :tls_protocol (Symbol) — default: negotiated

    What TLS version should be used (:TLSv1, :TLSv1_1, or :TLSv1_2)

  • :channel_max (Integer) — default: 2047

    Maximum number of channels allowed on this connection, minus 1 to account for the special channel 0.

  • :continuation_timeout (Integer) — default: 15000

    Timeout for client operations that expect a response (e.g. Queue#get), in milliseconds.

  • :connection_timeout (Integer) — default: 30

    Timeout in seconds for connecting to the server.

  • :read_timeout (Integer) — default: 30

    TCP socket read timeout in seconds. If heartbeats are disabled this will be ignored.

  • :write_timeout (Integer) — default: 30

    TCP socket write timeout in seconds.

  • :hosts_shuffle_strategy (Proc)

    a callable that reorders a list of host strings, defaults to Array#shuffle

  • :recovery_completed (Proc)

    a callable that will be called when a network recovery is performed

  • :logger (Logger)

    The logger. If missing, one is created using :log_file and :log_level.

  • :log_file (IO, String)

    The file or path to use when creating a logger. Defaults to STDOUT.

  • :logfile (IO, String)

    DEPRECATED: use :log_file instead. The file or path to use when creating a logger. Defaults to STDOUT.

  • :log_level (Integer)

    The log level to use when creating a logger. Defaults to LOGGER::WARN

  • :automatically_recover (Boolean) — default: true

    Should automatically recover from network failures?

  • :recovery_attempts (Integer) — default: nil

    Max number of recovery attempts, nil means forever

  • :reset_recovery_attempts_after_reconnection (Integer) — default: true

    Should recovery attempt counter be reset after successful reconnection? When set to false, the attempt counter will last through the entire lifetime of the connection object.

  • :recovery_attempt_started (Proc) — default: nil

    Will be called before every connection recovery attempt

  • :recovery_completed (Proc) — default: nil

    Will be called after successful connection recovery

  • :recovery_attempts_exhausted (Proc) — default: nil

    Will be called when the connection recovery failed after the specified amount of recovery attempts

  • :recover_from_connection_close (Boolean) — default: true

    Should this connection recover after receiving a server-sent connection.close (e.g. connection was force closed)?

  • :session_error_handler (Object) — default: Thread.current

    Object which responds to #raise that will act as a session error handler. Defaults to Thread.current, which will raise asynchronous exceptions in the thread that created the session.

Options Hash (optz):

  • :auth_mechanism (String) — default: "PLAIN"

    Authentication mechanism, PLAIN or EXTERNAL

  • :locale (String) — default: "PLAIN"

    Locale RabbitMQ should use

  • :connection_name (String) — default: nil

    Client-provided connection name, if any. Note that the value returned does not uniquely identify a connection and cannot be used as a connection identifier in HTTP API requests.

See Also:



142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
# File 'lib/bunny/session.rb', line 142

def initialize(connection_string_or_opts = ENV['RABBITMQ_URL'], optz = Hash.new)
  opts = case (connection_string_or_opts)
         when nil then
           Hash.new
         when String then
           self.class.parse_uri(connection_string_or_opts)
         when Hash then
           connection_string_or_opts
         end.merge(optz)

  @default_hosts_shuffle_strategy = Proc.new { |hosts| hosts.shuffle }

  @opts            = opts
  log_file         = opts[:log_file] || opts[:logfile] || STDOUT
  log_level        = opts[:log_level] || ENV["BUNNY_LOG_LEVEL"] || Logger::WARN
  # we might need to log a warning about ill-formatted IPv6 address but
  # progname includes hostname, so init like this first
  @logger          = opts.fetch(:logger, init_default_logger_without_progname(log_file, log_level))

  @addresses       = self.addresses_from(opts)
  @address_index   = 0

  @transport       = nil
  @user            = self.username_from(opts)
  @pass            = self.password_from(opts)
  @vhost           = self.vhost_from(opts)
  @threaded        = opts.fetch(:threaded, true)

  # re-init, see above
  @logger          = opts.fetch(:logger, init_default_logger(log_file, log_level))

  validate_connection_options(opts)
  @last_connection_error = nil

  # should automatic recovery from network failures be used?
  @automatically_recover = if opts[:automatically_recover].nil? && opts[:automatic_recovery].nil?
                             true
                           else
                             opts[:automatically_recover] | opts[:automatic_recovery]
                           end
  @recovering_from_network_failure = false
  @max_recovery_attempts = opts[:recovery_attempts]
  @recovery_attempts     = @max_recovery_attempts
  # When this is set, connection attempts won't be reset after
  # successful reconnection. Some find this behavior more sensible
  # than the per-failure attempt counter. MK.
  @reset_recovery_attempt_counter_after_reconnection = opts.fetch(:reset_recovery_attempts_after_reconnection, true)

  @network_recovery_interval = opts.fetch(:network_recovery_interval, DEFAULT_NETWORK_RECOVERY_INTERVAL)
  @recover_from_connection_close = opts.fetch(:recover_from_connection_close, true)
  # in ms
  @continuation_timeout   = opts.fetch(:continuation_timeout, DEFAULT_CONTINUATION_TIMEOUT)

  @status             = :not_connected
  @manually_closed    = false
  @blocked            = false

  # these are negotiated with the broker during the connection tuning phase
  @client_frame_max   = opts.fetch(:frame_max, DEFAULT_FRAME_MAX)
  @client_channel_max = normalize_client_channel_max(opts.fetch(:channel_max, DEFAULT_CHANNEL_MAX))
  # will be-renegotiated during connection tuning steps. MK.
  @channel_max        = @client_channel_max
  @heartbeat_sender   = nil
  @client_heartbeat   = self.heartbeat_from(opts)

  client_props         = opts[:properties] || opts[:client_properties] || {}
  @connection_name     = client_props[:connection_name] || opts[:connection_name]
  @client_properties   = DEFAULT_CLIENT_PROPERTIES.merge(client_props)
                                                  .merge(connection_name: connection_name)
  @mechanism           = normalize_auth_mechanism(opts.fetch(:auth_mechanism, "PLAIN"))
  @credentials_encoder = credentials_encoder_for(@mechanism)
  @locale              = @opts.fetch(:locale, DEFAULT_LOCALE)

  @mutex_impl          = @opts.fetch(:mutex_impl, Monitor)

  # mutex for the channel id => channel hash
  @channel_mutex       = @mutex_impl.new
  # transport operations/continuations mutex. A workaround for
  # the non-reentrant Ruby mutexes. MK.
  @transport_mutex     = @mutex_impl.new
  @status_mutex        = @mutex_impl.new
  @address_index_mutex = @mutex_impl.new

  @channels            = Hash.new

  @recovery_attempt_started = opts[:recovery_attempt_started]
  @recovery_completed       = opts[:recovery_completed]
  @recovery_attempts_exhausted          = opts[:recovery_attempts_exhausted]

  @session_error_handler = opts.fetch(:session_error_handler, Thread.current)

  @recoverable_exceptions = DEFAULT_RECOVERABLE_EXCEPTIONS.dup

  self.reset_continuations
  self.initialize_transport

end

Instance Attribute Details

#channel_id_allocatorObject (readonly)

Returns the value of attribute channel_id_allocator.



84
85
86
# File 'lib/bunny/session.rb', line 84

def channel_id_allocator
  @channel_id_allocator
end

#channel_maxObject (readonly)

Returns the value of attribute channel_max.



82
83
84
# File 'lib/bunny/session.rb', line 82

def channel_max
  @channel_max
end

#connection_nameObject (readonly)

Returns the value of attribute connection_name.



93
94
95
# File 'lib/bunny/session.rb', line 93

def connection_name
  @connection_name
end

#continuation_timeoutInteger (readonly)

Returns Timeout for blocking protocol operations (queue.declare, queue.bind, etc), in milliseconds. Default is 15000.

Returns:

  • (Integer)

    Timeout for blocking protocol operations (queue.declare, queue.bind, etc), in milliseconds. Default is 15000.



91
92
93
# File 'lib/bunny/session.rb', line 91

def continuation_timeout
  @continuation_timeout
end

#frame_maxObject (readonly)

Returns the value of attribute frame_max.



82
83
84
# File 'lib/bunny/session.rb', line 82

def frame_max
  @frame_max
end

#heartbeatObject (readonly)

Returns the value of attribute heartbeat.



82
83
84
# File 'lib/bunny/session.rb', line 82

def heartbeat
  @heartbeat
end

#loggerLogger (readonly)

Returns:

  • (Logger)


89
90
91
# File 'lib/bunny/session.rb', line 89

def logger
  @logger
end

#mechanismString (readonly)

Authentication mechanism, e.g. “PLAIN” or “EXTERNAL”

Returns:

  • (String)


87
88
89
# File 'lib/bunny/session.rb', line 87

def mechanism
  @mechanism
end

#mutex_implObject (readonly)



296
297
298
# File 'lib/bunny/session.rb', line 296

def mutex_impl
  @mutex_impl
end

#network_recovery_intervalObject (readonly)

Returns the value of attribute network_recovery_interval.



92
93
94
# File 'lib/bunny/session.rb', line 92

def network_recovery_interval
  @network_recovery_interval
end

#passObject (readonly)

Returns the value of attribute pass.



82
83
84
# File 'lib/bunny/session.rb', line 82

def pass
  @pass
end

#recoverable_exceptionsObject

Returns the value of attribute recoverable_exceptions.



95
96
97
# File 'lib/bunny/session.rb', line 95

def recoverable_exceptions
  @recoverable_exceptions
end

#server_authentication_mechanismsObject (readonly)

Returns the value of attribute server_authentication_mechanisms.



83
84
85
# File 'lib/bunny/session.rb', line 83

def server_authentication_mechanisms
  @server_authentication_mechanisms
end

#server_capabilitiesObject (readonly)

Returns the value of attribute server_capabilities.



83
84
85
# File 'lib/bunny/session.rb', line 83

def server_capabilities
  @server_capabilities
end

#server_localesObject (readonly)

Returns the value of attribute server_locales.



83
84
85
# File 'lib/bunny/session.rb', line 83

def server_locales
  @server_locales
end

#server_propertiesObject (readonly)

Returns the value of attribute server_properties.



83
84
85
# File 'lib/bunny/session.rb', line 83

def server_properties
  @server_properties
end

#socket_configuratorObject

Returns the value of attribute socket_configurator.



94
95
96
# File 'lib/bunny/session.rb', line 94

def socket_configurator
  @socket_configurator
end

#statusObject (readonly)

Returns the value of attribute status.



82
83
84
# File 'lib/bunny/session.rb', line 82

def status
  @status
end

#threadedObject (readonly)

Returns the value of attribute threaded.



82
83
84
# File 'lib/bunny/session.rb', line 82

def threaded
  @threaded
end

#transportBunny::Transport (readonly)

Returns:



81
82
83
# File 'lib/bunny/session.rb', line 81

def transport
  @transport
end

#userObject (readonly)

Returns the value of attribute user.



82
83
84
# File 'lib/bunny/session.rb', line 82

def user
  @user
end

#vhostObject (readonly)

Returns the value of attribute vhost.



82
83
84
# File 'lib/bunny/session.rb', line 82

def vhost
  @vhost
end

Class Method Details

.parse_uri(uri) ⇒ Hash

Parses an amqp URI into a hash that #initialize accepts.

Parameters:

  • uri (String)

    amqp or amqps URI to parse

Returns:

  • (Hash)

    Parsed URI as a hash



502
503
504
# File 'lib/bunny/session.rb', line 502

def self.parse_uri(uri)
  AMQ::Settings.configure(uri)
end

Instance Method Details

#addresses_from(options) ⇒ Object



932
933
934
935
936
937
938
939
940
941
942
943
944
# File 'lib/bunny/session.rb', line 932

def addresses_from(options)
  shuffle_strategy = options.fetch(:hosts_shuffle_strategy, @default_hosts_shuffle_strategy)

  addresses = options[:host] || options[:hostname] || options[:addresses] ||
    options[:hosts] || ["#{DEFAULT_HOST}:#{port_from(options)}"]
  addresses = [addresses] unless addresses.is_a? Array

  addrs = addresses.map do |address|
    host_with_port?(address) ? address : "#{address}:#{port_from(@opts)}"
  end

  shuffle_strategy.call(addrs)
end

#after_recovery_attempts_exhausted(&block) ⇒ Object

Defines a callable (e.g. a block) that will be called when the connection recovery failed after the specified numbers of recovery attempts.



561
562
563
# File 'lib/bunny/session.rb', line 561

def after_recovery_attempts_exhausted(&block)
  @recovery_attempts_exhausted = block
end

#after_recovery_completed(&block) ⇒ Object

Defines a callable (e.g. a block) that will be called after successful connection recovery.



554
555
556
# File 'lib/bunny/session.rb', line 554

def after_recovery_completed(&block)
  @recovery_completed = block
end

#announce_network_failure_recoveryObject



771
772
773
774
775
776
777
# File 'lib/bunny/session.rb', line 771

def announce_network_failure_recovery
  if recovery_attempts_limited?
    @logger.warn "Will recover from a network failure (#{@recovery_attempts} out of #{@max_recovery_attempts} left)..."
  else
    @logger.warn "Will recover from a network failure (no retry limit)..."
  end
end

#automatically_recover?Boolean

Returns true if this connection has automatic recovery from network failure enabled.

Returns:

  • (Boolean)

    true if this connection has automatic recovery from network failure enabled



466
467
468
# File 'lib/bunny/session.rb', line 466

def automatically_recover?
  @automatically_recover
end

#before_recovery_attempt_starts(&block) ⇒ Object

Defines a callable (e.g. a block) that will be called before every connection recovery attempt.



548
549
550
# File 'lib/bunny/session.rb', line 548

def before_recovery_attempt_starts(&block)
  @recovery_attempt_started = block
end

#blocked?Boolean

Returns true if the connection is currently blocked by RabbitMQ because it’s running low on RAM, disk space, or other resource; false otherwise.

Returns:

  • (Boolean)

    true if the connection is currently blocked by RabbitMQ because it’s running low on RAM, disk space, or other resource; false otherwise

See Also:



494
495
496
# File 'lib/bunny/session.rb', line 494

def blocked?
  @blocked
end

#clean_up_and_fail_on_connection_close!(method) ⇒ Object



905
906
907
908
909
910
911
912
913
914
915
# File 'lib/bunny/session.rb', line 905

def clean_up_and_fail_on_connection_close!(method)
  @last_connection_error = instantiate_connection_level_exception(method)
  @continuations.push(method)

  clean_up_on_shutdown
  if threaded?
    @session_error_handler.raise(@last_connection_error)
  else
    raise @last_connection_error
  end
end

#clean_up_on_shutdownObject



917
918
919
920
921
922
923
924
925
926
927
928
929
# File 'lib/bunny/session.rb', line 917

def clean_up_on_shutdown
  begin
    shut_down_all_consumer_work_pools!
    maybe_shutdown_reader_loop
    maybe_shutdown_heartbeat_sender
  rescue ShutdownSignal => _sse
    # no-op
  rescue Exception => e
    @logger.warn "Caught an exception when cleaning up after receiving connection.close: #{e.message}"
  ensure
    close_transport
  end
end

#close(await_response = true) ⇒ Object Also known as: stop

Closes the connection. This involves closing all of its channels.



398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
# File 'lib/bunny/session.rb', line 398

def close(await_response = true)
  @status_mutex.synchronize { @status = :closing }

  ignoring_io_errors do
    if @transport.open?
      @logger.debug "Transport is still open..."
      close_all_channels

      @logger.debug "Will close all channels...."
      self.close_connection(await_response)
    end

    clean_up_on_shutdown
  end
  @status_mutex.synchronize do
    @status = :closed
    @manually_closed = true
  end
  @logger.debug "Connection is closed"
  true
end

#close_all_channelsObject



611
612
613
614
615
616
617
# File 'lib/bunny/session.rb', line 611

def close_all_channels
  @channel_mutex.synchronize do
    @channels.reject {|n, ch| n == 0 || !ch.open? }.each do |_, ch|
      Bunny::Timeout.timeout(@transport.disconnect_timeout, ClientTimeout) { ch.close }
    end
  end
end

#close_channel(ch) ⇒ Object



586
587
588
589
590
591
592
593
594
595
596
597
598
# File 'lib/bunny/session.rb', line 586

def close_channel(ch)
  @channel_mutex.synchronize do
    n = ch.number

    @transport.send_frame(AMQ::Protocol::Channel::Close.encode(n, 200, "Goodbye", 0, 0))
    @last_channel_close_ok = wait_on_continuations
    raise_if_continuation_resulted_in_a_connection_error!

    self.unregister_channel(ch)
    self.release_channel_id(ch.id)
    @last_channel_close_ok
  end
end

#close_connection(await_response = true) ⇒ Object



620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
# File 'lib/bunny/session.rb', line 620

def close_connection(await_response = true)
  if @transport.open?
    @logger.debug "Transport is still open"
    @transport.send_frame(AMQ::Protocol::Connection::Close.encode(200, "Goodbye", 0, 0))

    if await_response
      @logger.debug "Waiting for a connection.close-ok..."
      @last_connection_close_ok = wait_on_continuations
    end
  end

  shut_down_all_consumer_work_pools!
  maybe_shutdown_heartbeat_sender
  @status_mutex.synchronize { @status = :not_connected }
end

#close_transportObject



1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
# File 'lib/bunny/session.rb', line 1098

def close_transport
  begin
    @transport.close
  rescue StandardError => e
    @logger.error "Exception when closing transport:"
    @logger.error e.class.name
    @logger.error e.message
    @logger.error e.backtrace
  end
end

#closed?Boolean

Returns true if this AMQP 0.9.1 connection is closed.

Returns:

  • (Boolean)

    true if this AMQP 0.9.1 connection is closed



448
449
450
# File 'lib/bunny/session.rb', line 448

def closed?
  @status_mutex.synchronize { @status == :closed }
end

#closing?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns true if this AMQP 0.9.1 connection is closing.

Returns:

  • (Boolean)

    true if this AMQP 0.9.1 connection is closing



443
444
445
# File 'lib/bunny/session.rb', line 443

def closing?
  @status_mutex.synchronize { @status == :closing }
end

#configure_socket(&block) ⇒ Object

Provides a way to fine tune the socket used by connection. Accepts a block that the socket will be yielded to.

Raises:

  • (ArgumentError)


300
301
302
303
304
# File 'lib/bunny/session.rb', line 300

def configure_socket(&block)
  raise ArgumentError, "No block provided!" if block.nil?

  @transport.configure_socket(&block)
end

#connecting?Boolean

Returns true if this connection is still not fully open.

Returns:

  • (Boolean)

    true if this connection is still not fully open



437
438
439
# File 'lib/bunny/session.rb', line 437

def connecting?
  status == :connecting
end

#create_channel(n = nil, consumer_pool_size = 1, consumer_pool_abort_on_exception = false, consumer_pool_shutdown_timeout = 60) ⇒ Bunny::Channel Also known as: channel

Opens a new channel and returns it. This method will block the calling thread until the response is received and the channel is guaranteed to be opened (this operation is very fast and inexpensive).

Returns:

Raises:

  • (ArgumentError)


380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
# File 'lib/bunny/session.rb', line 380

def create_channel(n = nil, consumer_pool_size = 1, consumer_pool_abort_on_exception = false, consumer_pool_shutdown_timeout = 60)
  raise ArgumentError, "channel number 0 is reserved in the protocol and cannot be used" if 0 == n
  raise ConnectionAlreadyClosed if manually_closed?
  raise RuntimeError, "this connection is not open. Was Bunny::Session#start invoked? Is automatic recovery enabled?" if !connected?

  @channel_mutex.synchronize do
    if n && (ch = @channels[n])
      ch
    else
      ch = Bunny::Channel.new(self, n, ConsumerWorkPool.new(consumer_pool_size || 1, consumer_pool_abort_on_exception, consumer_pool_shutdown_timeout))
      ch.open
      ch
    end
  end
end

#decrement_recovery_attemp_counter!Object



838
839
840
841
842
843
844
# File 'lib/bunny/session.rb', line 838

def decrement_recovery_attemp_counter!
  if @recovery_attempts
    @recovery_attempts -= 1
    @logger.debug "#{@recovery_attempts} recovery attempts left"
  end
  @recovery_attempts
end

#exchange_exists?(name) ⇒ Boolean

Checks if a exchange with given name exists.

Implemented using exchange.declare with passive set to true and a one-off (short lived) channel under the hood.

Parameters:

  • name (String)

    Exchange name

Returns:

  • (Boolean)

    true if exchange exists



534
535
536
537
538
539
540
541
542
543
544
# File 'lib/bunny/session.rb', line 534

def exchange_exists?(name)
  ch = create_channel
  begin
    ch.exchange(name, :passive => true)
    true
  rescue Bunny::NotFound => _
    false
  ensure
    ch.close if ch.open?
  end
end

#find_channel(number) ⇒ Object



601
602
603
# File 'lib/bunny/session.rb', line 601

def find_channel(number)
  @channels[number]
end

#handle_frame(ch_number, method) ⇒ Object

Handles incoming frames and dispatches them.

Channel methods (‘channel.open-ok`, `channel.close-ok`) are handled by the session itself. Connection level errors result in exceptions being raised. Deliveries and other methods are passed on to channels to dispatch.



644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
# File 'lib/bunny/session.rb', line 644

def handle_frame(ch_number, method)
  @logger.debug { "Session#handle_frame on #{ch_number}: #{method.inspect}" }
  case method
  when AMQ::Protocol::Channel::OpenOk then
    @continuations.push(method)
  when AMQ::Protocol::Channel::CloseOk then
    @continuations.push(method)
  when AMQ::Protocol::Connection::Close then
    if recover_from_connection_close?
      @logger.warn "Recovering from connection.close (#{method.reply_text})"
      clean_up_on_shutdown
      handle_network_failure(instantiate_connection_level_exception(method))
    else
      clean_up_and_fail_on_connection_close!(method)
    end
  when AMQ::Protocol::Connection::CloseOk then
    @last_connection_close_ok = method
    begin
      @continuations.clear
    rescue StandardError => e
      @logger.error e.class.name
      @logger.error e.message
      @logger.error e.backtrace
    ensure
      @continuations.push(:__unblock__)
    end
  when AMQ::Protocol::Connection::Blocked then
    @blocked = true
    @block_callback.call(method) if @block_callback
  when AMQ::Protocol::Connection::Unblocked then
    @blocked = false
    @unblock_callback.call(method) if @unblock_callback
  when AMQ::Protocol::Connection::UpdateSecretOk then
    @continuations.push(method)
  when AMQ::Protocol::Channel::Close then
    begin
      ch = synchronised_find_channel(ch_number)
      # this includes sending a channel.close-ok and
      # potentially invoking a user-provided callback,
      # avoid doing that while holding a mutex lock. MK.
      ch.handle_method(method)
    ensure
      if ch.nil?
        @logger.warn "Received a server-sent channel.close but the channel was not found locally. Ignoring the frame."
      else
        # synchronises on @channel_mutex under the hood
        self.unregister_channel(ch)
      end
    end
  when AMQ::Protocol::Basic::GetEmpty then
    ch = find_channel(ch_number)
    ch.handle_basic_get_empty(method)
  else
    if ch = find_channel(ch_number)
      ch.handle_method(method)
    else
      @logger.warn "Channel #{ch_number} is not open on this connection!"
    end
  end
end

#handle_frameset(ch_number, frames) ⇒ Object



711
712
713
714
715
716
717
718
719
720
721
722
723
724
# File 'lib/bunny/session.rb', line 711

def handle_frameset(ch_number, frames)
  method = frames.first

  case method
  when AMQ::Protocol::Basic::GetOk then
    @channels[ch_number].handle_basic_get_ok(*frames)
  when AMQ::Protocol::Basic::GetEmpty then
    @channels[ch_number].handle_basic_get_empty(*frames)
  when AMQ::Protocol::Basic::Return then
    @channels[ch_number].handle_basic_return(*frames)
  else
    @channels[ch_number].handle_frameset(*frames)
  end
end

#handle_network_failure(exception) ⇒ Object



732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
# File 'lib/bunny/session.rb', line 732

def handle_network_failure(exception)
  raise NetworkErrorWrapper.new(exception) unless @threaded

  @status_mutex.synchronize { @status = :disconnected }

  if !recovering_from_network_failure?
    begin
      @recovering_from_network_failure = true
      if recoverable_network_failure?(exception)
        announce_network_failure_recovery
        @channel_mutex.synchronize do
          @channels.each do |n, ch|
            ch.maybe_kill_consumer_work_pool!
          end
        end
        @reader_loop.stop if @reader_loop
        maybe_shutdown_heartbeat_sender

        recover_from_network_failure
      else
        @logger.error "Exception #{exception.message} is considered unrecoverable..."
      end
    ensure
      @recovering_from_network_failure = false
    end
  end
end

#heartbeat_from(options) ⇒ Object



1030
1031
1032
# File 'lib/bunny/session.rb', line 1030

def heartbeat_from(options)
  options[:heartbeat] || options[:heartbeat_timeout] || options[:requested_heartbeat] || options[:heartbeat_interval] || DEFAULT_HEARTBEAT
end

#heartbeat_intervalInteger

Deprecated.

Returns Heartbeat timeout (not interval) used.

Returns:

  • (Integer)

    Heartbeat timeout (not interval) used



261
# File 'lib/bunny/session.rb', line 261

def heartbeat_interval; self.heartbeat; end

#heartbeat_timeoutInteger

Returns Heartbeat timeout used.

Returns:

  • (Integer)

    Heartbeat timeout used



264
# File 'lib/bunny/session.rb', line 264

def heartbeat_timeout; self.heartbeat; end

#hostObject



283
284
285
# File 'lib/bunny/session.rb', line 283

def host
  @transport ? @transport.host : host_from_address(@addresses[@address_index])
end

#host_from_address(address) ⇒ Object



971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
# File 'lib/bunny/session.rb', line 971

def host_from_address(address)
  # we need to handle cases such as [2001:db8:85a3:8d3:1319:8a2e:370:7348]:5671
  last_colon                  = address.rindex(":")
  last_closing_square_bracket = address.rindex("]")

  if last_closing_square_bracket.nil?
    parts = address.split(":")
    # this looks like an unquoted IPv6 address, so emit a warning
    if parts.size > 2
      @logger.warn "Address #{address} looks like an unquoted IPv6 address. Make sure you quote IPv6 addresses like so: [2001:db8:85a3:8d3:1319:8a2e:370:7348]"
    end
    return parts[0]
  end

  if last_closing_square_bracket < last_colon
    # there is a port
    address[0, last_colon]
  elsif last_closing_square_bracket > last_colon
    address
  end
end

#host_with_port?(address) ⇒ Boolean

Returns:

  • (Boolean)


958
959
960
961
962
963
964
965
966
967
968
# File 'lib/bunny/session.rb', line 958

def host_with_port?(address)
  # we need to handle cases such as [2001:db8:85a3:8d3:1319:8a2e:370:7348]:5671
  last_colon                  = address.rindex(":")
  last_closing_square_bracket = address.rindex("]")

  if last_closing_square_bracket.nil?
    address.include?(":")
  else
    last_closing_square_bracket < last_colon
  end
end

#hostnameString

Returns RabbitMQ hostname (or IP address) used.

Returns:

  • (String)

    RabbitMQ hostname (or IP address) used



251
# File 'lib/bunny/session.rb', line 251

def hostname;     self.host;  end

#inspectObject



1212
1213
1214
# File 'lib/bunny/session.rb', line 1212

def inspect
  to_s
end

#instantiate_connection_level_exception(frame) ⇒ Object



877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
# File 'lib/bunny/session.rb', line 877

def instantiate_connection_level_exception(frame)
  case frame
  when AMQ::Protocol::Connection::Close then
    klass = case frame.reply_code
            when 320 then
              ConnectionForced
            when 501 then
              FrameError
            when 503 then
              CommandInvalid
            when 504 then
              ChannelError
            when 505 then
              UnexpectedFrame
            when 506 then
              ResourceError
            when 530 then
              NotAllowedError
            when 541 then
              InternalError
            else
              raise "Unknown reply code: #{frame.reply_code}, text: #{frame.reply_text}"
            end

    klass.new("Connection-level error: #{frame.reply_text}", self, frame)
  end
end

#local_portInteger

Returns Client socket port.

Returns:

  • (Integer)

    Client socket port



307
308
309
# File 'lib/bunny/session.rb', line 307

def local_port
  @transport.local_address.ip_port
end

#manually_closed?Boolean

Returns true if this AMQP 0.9.1 connection has been closed by the user (as opposed to the server).

Returns:

  • (Boolean)

    true if this AMQP 0.9.1 connection has been closed by the user (as opposed to the server)



453
454
455
# File 'lib/bunny/session.rb', line 453

def manually_closed?
  @status_mutex.synchronize { @manually_closed == true }
end

#maybe_shutdown_reader_loopObject



1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
# File 'lib/bunny/session.rb', line 1072

def maybe_shutdown_reader_loop
  if @reader_loop
    @reader_loop.stop
    if threaded?
      # this is the easiest way to wait until the loop
      # is guaranteed to have terminated
      @reader_loop.terminate_with(ShutdownSignal)
      # joining the thread here may take forever
      # on JRuby because sun.nio.ch.KQueueArrayWrapper#kevent0 is
      # a native method that cannot be (easily) interrupted.
      # So we use this ugly hack or else our test suite takes forever
      # to run on JRuby (a new connection is opened/closed per example). MK.
      if defined?(JRUBY_VERSION)
        sleep 0.075
      else
        @reader_loop.join
      end
    else
      # single threaded mode, nothing to do. MK.
    end
  end

  @reader_loop = nil
end

#next_channel_idObject



1035
1036
1037
# File 'lib/bunny/session.rb', line 1035

def next_channel_id
  @channel_id_allocator.next_channel_id
end

#notify_of_recovery_attempt_startObject



862
863
864
# File 'lib/bunny/session.rb', line 862

def notify_of_recovery_attempt_start
  @recovery_attempt_started.call if @recovery_attempt_started
end

#notify_of_recovery_attempts_exhaustedObject



872
873
874
# File 'lib/bunny/session.rb', line 872

def notify_of_recovery_attempts_exhausted
  @recovery_attempts_exhausted.call if @recovery_attempts_exhausted
end

#notify_of_recovery_completionObject



867
868
869
# File 'lib/bunny/session.rb', line 867

def notify_of_recovery_completion
  @recovery_completed.call if @recovery_completed
end

#on_blocked {|AMQ::Protocol::Connection::Blocked| ... } ⇒ Object

Defines a callback that will be executed when RabbitMQ blocks the connection because it is running low on memory or disk space (as configured via config file and/or rabbitmqctl).

Yields:

  • (AMQ::Protocol::Connection::Blocked)

    connection.blocked method which provides a reason for blocking



477
478
479
# File 'lib/bunny/session.rb', line 477

def on_blocked(&block)
  @block_callback = block
end

#on_unblocked(&block) ⇒ Object

Defines a callback that will be executed when RabbitMQ unblocks the connection that was previously blocked, e.g. because the memory or disk space alarm has cleared.

See Also:



486
487
488
# File 'lib/bunny/session.rb', line 486

def on_unblocked(&block)
  @unblock_callback = block
end

#open?Boolean Also known as: connected?

Returns true if this AMQP 0.9.1 connection is open.

Returns:

  • (Boolean)

    true if this AMQP 0.9.1 connection is open



458
459
460
461
462
# File 'lib/bunny/session.rb', line 458

def open?
  @status_mutex.synchronize do
    (status == :open || status == :connected || status == :connecting) && @transport.open?
  end
end

#open_channel(ch) ⇒ Object



570
571
572
573
574
575
576
577
578
579
580
581
582
583
# File 'lib/bunny/session.rb', line 570

def open_channel(ch)
  @channel_mutex.synchronize do
    n = ch.number
    self.register_channel(ch)

    @transport_mutex.synchronize do
      @transport.send_frame(AMQ::Protocol::Channel::Open.encode(n, AMQ::Protocol::EMPTY_STRING))
    end
    @last_channel_open_ok = wait_on_continuations
    raise_if_continuation_resulted_in_a_connection_error!

    @last_channel_open_ok
  end
end

#passwordString

Returns Password used.

Returns:

  • (String)

    Password used



255
# File 'lib/bunny/session.rb', line 255

def password;     self.pass;  end

#password_from(options) ⇒ Object



1025
1026
1027
# File 'lib/bunny/session.rb', line 1025

def password_from(options)
  options[:password] || options[:pass] || options[:pwd] || DEFAULT_PASSWORD
end

#portObject



287
288
289
# File 'lib/bunny/session.rb', line 287

def port
  @transport ? @transport.port : port_from_address(@addresses[@address_index])
end

#port_from(options) ⇒ Object



947
948
949
950
951
952
953
954
955
# File 'lib/bunny/session.rb', line 947

def port_from(options)
  fallback = if options[:tls] || options[:ssl]
               AMQ::Protocol::TLS_PORT
             else
               AMQ::Protocol::DEFAULT_PORT
             end

  options.fetch(:port, fallback)
end

#port_from_address(address) ⇒ Object



994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
# File 'lib/bunny/session.rb', line 994

def port_from_address(address)
  # we need to handle cases such as [2001:db8:85a3:8d3:1319:8a2e:370:7348]:5671
  last_colon                  = address.rindex(":")
  last_closing_square_bracket = address.rindex("]")

  if last_closing_square_bracket.nil?
    parts = address.split(":")
    # this looks like an unquoted IPv6 address, so emit a warning
    if parts.size > 2
      @logger.warn "Address #{address} looks like an unquoted IPv6 address. Make sure you quote IPv6 addresses like so: [2001:db8:85a3:8d3:1319:8a2e:370:7348]"
    end
    return parts[1].to_i
  end

  if last_closing_square_bracket < last_colon
    # there is a port
    address[(last_colon + 1)..-1].to_i
  end
end

#queue_exists?(name) ⇒ Boolean

Checks if a queue with given name exists.

Implemented using queue.declare with passive set to true and a one-off (short lived) channel under the hood.

Parameters:

  • name (String)

    Queue name

Returns:

  • (Boolean)

    true if queue exists



514
515
516
517
518
519
520
521
522
523
524
# File 'lib/bunny/session.rb', line 514

def queue_exists?(name)
  ch = create_channel
  begin
    ch.queue(name, :passive => true)
    true
  rescue Bunny::NotFound => _
    false
  ensure
    ch.close if ch.open?
  end
end

#raise_if_continuation_resulted_in_a_connection_error!Object

Raises:

  • (@last_connection_error)


706
707
708
# File 'lib/bunny/session.rb', line 706

def raise_if_continuation_resulted_in_a_connection_error!
  raise @last_connection_error if @last_connection_error
end

#reader_loopObject



1067
1068
1069
# File 'lib/bunny/session.rb', line 1067

def reader_loop
  @reader_loop ||= ReaderLoop.new(@transport, self, @session_error_handler)
end

#recover_channelsObject



852
853
854
855
856
857
858
859
# File 'lib/bunny/session.rb', line 852

def recover_channels
  @channel_mutex.synchronize do
    @channels.each do |n, ch|
      ch.open
      ch.recover_from_network_failure
    end
  end
end

#recover_from_connection_close?Boolean

Returns:

  • (Boolean)


727
728
729
# File 'lib/bunny/session.rb', line 727

def recover_from_connection_close?
  @recover_from_connection_close
end

#recover_from_network_failureObject



780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
# File 'lib/bunny/session.rb', line 780

def recover_from_network_failure
  sleep @network_recovery_interval
  @logger.debug "Will attempt connection recovery..."
  notify_of_recovery_attempt_start

  self.initialize_transport

  @logger.warn "Retrying connection on next host in line: #{@transport.host}:#{@transport.port}"
  self.start

  if open?

    @recovering_from_network_failure = false
    @logger.debug "Connection is now open"
    if @reset_recovery_attempt_counter_after_reconnection
      @logger.debug "Resetting recovery attempt counter after successful reconnection"
      reset_recovery_attempt_counter!
    else
      @logger.debug "Not resetting recovery attempt counter after successful reconnection, as configured"
    end

    recover_channels
    notify_of_recovery_completion
  end
rescue HostListDepleted
  reset_address_index
  retry
rescue => e
  if recoverable_network_failure?(e)
    @logger.warn "TCP connection failed"
    if should_retry_recovery?
      @logger.warn "Reconnecting in #{@network_recovery_interval} seconds"
      decrement_recovery_attemp_counter!
      announce_network_failure_recovery
      retry
    else
      @logger.error "Ran out of recovery attempts (limit set to #{@max_recovery_attempts}), giving up"
      @transport.close
      self.close(false)
      @manually_closed = false
      notify_of_recovery_attempts_exhausted
    end
  else
    raise e
  end
end

#recoverable_network_failure?(exception) ⇒ Boolean

Returns:

  • (Boolean)


761
762
763
# File 'lib/bunny/session.rb', line 761

def recoverable_network_failure?(exception)
  @recoverable_exceptions.any? {|x| exception.kind_of? x}
end

#recovering_from_network_failure?Boolean

Returns:

  • (Boolean)


766
767
768
# File 'lib/bunny/session.rb', line 766

def recovering_from_network_failure?
  @recovering_from_network_failure
end

#recovery_attempts_limited?Boolean

Returns:

  • (Boolean)


828
829
830
# File 'lib/bunny/session.rb', line 828

def recovery_attempts_limited?
  !!@max_recovery_attempts
end

#register_channel(ch) ⇒ Object



1045
1046
1047
1048
1049
# File 'lib/bunny/session.rb', line 1045

def register_channel(ch)
  @channel_mutex.synchronize do
    @channels[ch.number] = ch
  end
end

#release_channel_id(i) ⇒ Object



1040
1041
1042
# File 'lib/bunny/session.rb', line 1040

def release_channel_id(i)
  @channel_id_allocator.release_channel_id(i)
end

#reset_address_indexObject



291
292
293
# File 'lib/bunny/session.rb', line 291

def reset_address_index
  @address_index_mutex.synchronize { @address_index = 0 }
end

#reset_recovery_attempt_counter!Object



847
848
849
# File 'lib/bunny/session.rb', line 847

def reset_recovery_attempt_counter!
  @recovery_attempts = @max_recovery_attempts
end

#send_frame(frame, signal_activity = true) ⇒ Object

Sends frame to the peer, checking that connection is open. Exposed primarily for Bunny::Channel



1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
# File 'lib/bunny/session.rb', line 1120

def send_frame(frame, signal_activity = true)
  if open?
    # @transport_mutex.synchronize do
    #   @transport.write(frame.encode)
    # end
    @transport.write(frame.encode)
    signal_activity! if signal_activity
  else
    raise ConnectionClosedError.new(frame)
  end
end

#send_frame_without_timeout(frame, signal_activity = true) ⇒ Object

Sends frame to the peer, checking that connection is open. Uses transport implementation that does not perform timeout control. Exposed primarily for Bunny::Channel.



1138
1139
1140
1141
1142
1143
1144
1145
# File 'lib/bunny/session.rb', line 1138

def send_frame_without_timeout(frame, signal_activity = true)
  if open?
    @transport.write_without_timeout(frame.encode)
    signal_activity! if signal_activity
  else
    raise ConnectionClosedError.new(frame)
  end
end

#send_frameset(frames, channel) ⇒ Object

Sends multiple frames, in one go. For thread safety this method takes a channel object and synchronizes on it.



1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
# File 'lib/bunny/session.rb', line 1151

def send_frameset(frames, channel)
  # some developers end up sharing channels between threads and when multiple
  # threads publish on the same channel aggressively, at some point frames will be
  # delivered out of order and broker will raise 505 UNEXPECTED_FRAME exception.
  # If we synchronize on the channel, however, this is both thread safe and pretty fine-grained
  # locking. Note that "single frame" methods technically do not need this kind of synchronization
  # (no incorrect frame interleaving of the same kind as with basic.publish isn't possible) but we
  # still recommend not sharing channels between threads except for consumer-only cases in the docs. MK.
  channel.synchronize do
    # see rabbitmq/rabbitmq-server#156
    if open?
      data = frames.reduce("") { |acc, frame| acc << frame.encode }
      @transport.write(data)
      signal_activity!
    else
      raise ConnectionClosedError.new(frames)
    end
  end
end

#send_frameset_without_timeout(frames, channel) ⇒ Object

Sends multiple frames, one by one. For thread safety this method takes a channel object and synchronizes on it. Uses transport implementation that does not perform timeout control.



1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
# File 'lib/bunny/session.rb', line 1176

def send_frameset_without_timeout(frames, channel)
  # some developers end up sharing channels between threads and when multiple
  # threads publish on the same channel aggressively, at some point frames will be
  # delivered out of order and broker will raise 505 UNEXPECTED_FRAME exception.
  # If we synchronize on the channel, however, this is both thread safe and pretty fine-grained
  # locking. See a note about "single frame" methods in a comment in `send_frameset`. MK.
  channel.synchronize do
    if open?
      frames.each { |frame| self.send_frame_without_timeout(frame, false) }
      signal_activity!
    else
      raise ConnectionClosedError.new(frames)
    end
  end
end

#send_raw_without_timeout(data, channel) ⇒ Object



1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
# File 'lib/bunny/session.rb', line 1193

def send_raw_without_timeout(data, channel)
  # some developers end up sharing channels between threads and when multiple
  # threads publish on the same channel aggressively, at some point frames will be
  # delivered out of order and broker will raise 505 UNEXPECTED_FRAME exception.
  # If we synchronize on the channel, however, this is both thread safe and pretty fine-grained
  # locking. Note that "single frame" methods do not need this kind of synchronization. MK.
  channel.synchronize do
    @transport.write(data)
    signal_activity!
  end
end

#should_retry_recovery?Boolean

Returns:

  • (Boolean)


833
834
835
# File 'lib/bunny/session.rb', line 833

def should_retry_recovery?
  !recovery_attempts_limited? || @recovery_attempts > 1
end

#signal_activity!Object



1110
1111
1112
# File 'lib/bunny/session.rb', line 1110

def signal_activity!
  @heartbeat_sender.signal_activity! if @heartbeat_sender
end

#startObject

Starts the connection process.



316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
# File 'lib/bunny/session.rb', line 316

def start
  return self if connected?

  @status_mutex.synchronize { @status = :connecting }
  # reset here for cases when automatic network recovery kicks in
  # when we were blocked. MK.
  @blocked       = false
  self.reset_continuations

  begin
    begin
      # close existing transport if we have one,
      # to not leak sockets
      @transport.maybe_initialize_socket

      @transport.post_initialize_socket
      @transport.connect

      self.init_connection
      self.open_connection

      @reader_loop = nil
      self.start_reader_loop if threaded?

    rescue TCPConnectionFailed => e
      @logger.warn e.message
      self.initialize_transport
      @logger.warn "Will try to connect to the next endpoint in line: #{@transport.host}:#{@transport.port}"

      return self.start
    rescue
      @status_mutex.synchronize { @status = :not_connected }
      raise
    end
  rescue HostListDepleted
    self.reset_address_index
    @status_mutex.synchronize { @status = :not_connected }
    raise TCPConnectionFailedForAllHosts
  end
  @status_mutex.synchronize { @manually_closed = false }

  self
end

#start_reader_loopObject



1062
1063
1064
# File 'lib/bunny/session.rb', line 1062

def start_reader_loop
  reader_loop.start
end

#synchronised_find_channel(number) ⇒ Object



606
607
608
# File 'lib/bunny/session.rb', line 606

def synchronised_find_channel(number)
  @channel_mutex.synchronize { @channels[number] }
end

#threaded?Boolean

Returns true if this connection uses a separate thread for I/O activity.

Returns:

  • (Boolean)

    true if this connection uses a separate thread for I/O activity



279
280
281
# File 'lib/bunny/session.rb', line 279

def threaded?
  @threaded
end

#to_sString

Returns:

  • (String)


1207
1208
1209
1210
# File 'lib/bunny/session.rb', line 1207

def to_s
  oid = ("0x%x" % (self.object_id << 1))
  "#<#{self.class.name}:#{oid} #{@user}@#{host}:#{port}, vhost=#{@vhost}, addresses=[#{@addresses.join(',')}]>"
end

#transport_write_timeoutInteger

Socket operation write timeout used by this connection

Returns:

  • (Integer)


371
372
373
# File 'lib/bunny/session.rb', line 371

def transport_write_timeout
  @transport.write_timeout
end

#unregister_channel(ch) ⇒ Object



1052
1053
1054
1055
1056
1057
1058
1059
# File 'lib/bunny/session.rb', line 1052

def unregister_channel(ch)
  @channel_mutex.synchronize do
    n = ch.number

    self.release_channel_id(n)
    @channels.delete(ch.number)
  end
end

#update_secret(value, reason) ⇒ Object



360
361
362
363
364
365
366
# File 'lib/bunny/session.rb', line 360

def update_secret(value, reason)
  @transport.send_frame(AMQ::Protocol::Connection::UpdateSecret.encode(value, reason))
  @last_update_secret_ok = wait_on_continuations
  raise_if_continuation_resulted_in_a_connection_error!

  @last_update_secret_ok
end

#usernameString

Returns Username used.

Returns:

  • (String)

    Username used



253
# File 'lib/bunny/session.rb', line 253

def username;     self.user;  end

#username_from(options) ⇒ Object



1020
1021
1022
# File 'lib/bunny/session.rb', line 1020

def username_from(options)
  options[:username] || options[:user] || DEFAULT_USER
end

#uses_ssl?Boolean Also known as: ssl?

Returns true if this connection uses TLS (SSL).

Returns:

  • (Boolean)

    true if this connection uses TLS (SSL)



273
274
275
# File 'lib/bunny/session.rb', line 273

def uses_ssl?
  @transport.uses_ssl?
end

#uses_tls?Boolean Also known as: tls?

Returns true if this connection uses TLS (SSL).

Returns:

  • (Boolean)

    true if this connection uses TLS (SSL)



267
268
269
# File 'lib/bunny/session.rb', line 267

def uses_tls?
  @transport.uses_tls?
end

#validate_connection_options(options) ⇒ Object



240
241
242
243
244
245
246
247
248
# File 'lib/bunny/session.rb', line 240

def validate_connection_options(options)
  if options[:hosts] && options[:addresses]
    raise ArgumentError, "Connection options can't contain hosts and addresses at the same time"
  end

  if (options[:host] || options[:hostname]) && (options[:hosts] || options[:addresses])
    @logger.warn "Connection options contain both a host and an array of hosts (addresses), please pick one."
  end
end

#vhost_from(options) ⇒ Object



1015
1016
1017
# File 'lib/bunny/session.rb', line 1015

def vhost_from(options)
  options[:virtual_host] || options[:vhost] || DEFAULT_VHOST
end

#virtual_hostString

Returns Virtual host used.

Returns:

  • (String)

    Virtual host used



257
# File 'lib/bunny/session.rb', line 257

def virtual_host; self.vhost; end

#with_channel(n = nil) ⇒ Bunny::Session

Creates a temporary channel, yields it to the block given to this method and closes it.

Returns:



425
426
427
428
429
430
431
432
433
434
# File 'lib/bunny/session.rb', line 425

def with_channel(n = nil)
  ch = create_channel(n)
  begin
    yield ch
  ensure
    ch.close if ch.open?
  end

  self
end