Class: Bunny::Session
- Inherits:
-
Object
- Object
- Bunny::Session
- Defined in:
- lib/bunny/session.rb
Overview
Represents AMQP 0.9.1 connection (to a RabbitMQ node).
Constant Summary collapse
- DEFAULT_HOST =
Default host used for connection
"127.0.0.1"- DEFAULT_VHOST =
Default virtual host used for connection
"/"- DEFAULT_USER =
Default username used for connection
"guest"- DEFAULT_PASSWORD =
Default password used for connection
"guest"- DEFAULT_HEARTBEAT =
Default heartbeat interval, the same value as RabbitMQ 3.0 uses.
:server- DEFAULT_CHANNEL_MAX =
CHANNEL_MAX_LIMIT- DEFAULT_CLIENT_PROPERTIES =
RabbitMQ client metadata
{ :capabilities => { :publisher_confirms => true, :consumer_cancel_notify => true, :exchange_exchange_bindings => true, :"basic.nack" => true, :"connection.blocked" => true, # See http://www.rabbitmq.com/auth-notification.html :authentication_failure_close => true }, :product => "Bunny", :platform => ::RUBY_DESCRIPTION, :version => Bunny::VERSION, :information => "http://rubybunny.info", }
- DEFAULT_NETWORK_RECOVERY_INTERVAL =
Default reconnection interval for TCP connection failures
5.0
Instance Attribute Summary collapse
-
#channel_id_allocator ⇒ Object
readonly
Returns the value of attribute channel_id_allocator.
-
#channel_max ⇒ Object
readonly
Returns the value of attribute channel_max.
-
#continuation_timeout ⇒ Integer
readonly
Timeout for blocking protocol operations (queue.declare, queue.bind, etc), in milliseconds.
-
#frame_max ⇒ Object
readonly
Returns the value of attribute frame_max.
-
#heartbeat ⇒ Object
readonly
Returns the value of attribute heartbeat.
- #logger ⇒ Logger readonly
-
#mechanism ⇒ String
readonly
Authentication mechanism, e.g.
-
#pass ⇒ Object
readonly
Returns the value of attribute pass.
-
#port ⇒ Object
readonly
Returns the value of attribute port.
-
#server_authentication_mechanisms ⇒ Object
readonly
Returns the value of attribute server_authentication_mechanisms.
-
#server_capabilities ⇒ Object
readonly
Returns the value of attribute server_capabilities.
-
#server_locales ⇒ Object
readonly
Returns the value of attribute server_locales.
-
#server_properties ⇒ Object
readonly
Returns the value of attribute server_properties.
-
#status ⇒ Object
readonly
Returns the value of attribute status.
-
#threaded ⇒ Object
readonly
Returns the value of attribute threaded.
- #transport ⇒ Bunny::Transport readonly
-
#user ⇒ Object
readonly
Returns the value of attribute user.
-
#vhost ⇒ Object
readonly
Returns the value of attribute vhost.
Class Method Summary collapse
-
.parse_uri(uri) ⇒ Hash
Parses an amqp[s] URI into a hash that #initialize accepts.
Instance Method Summary collapse
-
#automatically_recover? ⇒ Boolean
True if this connection has automatic recovery from network failure enabled.
-
#blocked? ⇒ Boolean
True if the connection is currently blocked by RabbitMQ because it's running low on RAM, disk space, or other resource; false otherwise.
- #clean_up_and_fail_on_connection_close!(method) ⇒ Object
- #clean_up_on_shutdown ⇒ Object
-
#close ⇒ Object
(also: #stop)
Closes the connection.
-
#closed? ⇒ Boolean
True if this AMQP 0.9.1 connection is closed.
-
#closing? ⇒ Boolean
private
True if this AMQP 0.9.1 connection is closing.
-
#configure_socket(&block) ⇒ Object
Provides a way to fine tune the socket used by connection.
-
#connecting? ⇒ Boolean
True if this connection is still not fully open.
-
#create_channel(n = nil, consumer_pool_size = 1) ⇒ Bunny::Channel
(also: #channel)
Opens a new channel and returns it.
-
#exchange_exists?(name) ⇒ Boolean
Checks if a exchange with given name exists.
- #heartbeat_disabled?(val) ⇒ Boolean protected
-
#heartbeat_interval ⇒ Integer
Heartbeat interval used.
- #host ⇒ Object
-
#hostname ⇒ String
RabbitMQ hostname (or IP address) used.
- #ignoring_io_errors(&block) ⇒ Object protected
-
#initialize(connection_string_or_opts = Hash.new, optz = Hash.new) ⇒ Session
constructor
A new instance of Session.
- #inspect ⇒ Object
-
#local_port ⇒ Integer
Client socket port.
- #normalize_client_channel_max(n) ⇒ Object protected
-
#on_blocked {|AMQ::Protocol::Connection::Blocked| ... } ⇒ Object
Defines a callback that will be executed when RabbitMQ blocks the connection because it is running low on memory or disk space (as configured via config file and/or rabbitmqctl).
-
#on_unblocked(&block) ⇒ Object
Defines a callback that will be executed when RabbitMQ unblocks the connection that was previously blocked, e.g.
-
#open? ⇒ Boolean
(also: #connected?)
True if this AMQP 0.9.1 connection is open.
-
#password ⇒ String
Password used.
-
#queue_exists?(name) ⇒ Boolean
Checks if a queue with given name exists.
- #reset_address_index ⇒ Object
-
#start ⇒ Object
Starts the connection process.
-
#threaded? ⇒ Boolean
True if this connection uses a separate thread for I/O activity.
- #to_s ⇒ String
-
#username ⇒ String
Username used.
-
#uses_ssl? ⇒ Boolean
(also: #ssl?)
True if this connection uses TLS (SSL).
-
#uses_tls? ⇒ Boolean
(also: #tls?)
True if this connection uses TLS (SSL).
- #validate_connection_options(options) ⇒ Object
-
#virtual_host ⇒ String
Virtual host used.
-
#with_channel(n = nil) ⇒ Bunny::Session
Creates a temporary channel, yields it to the block given to this method and closes it.
Constructor Details
#initialize(connection_string_or_opts = Hash.new, optz = Hash.new) ⇒ Session
Returns a new instance of Session.
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 |
# File 'lib/bunny/session.rb', line 127 def initialize(connection_string_or_opts = Hash.new, optz = Hash.new) opts = case (ENV["RABBITMQ_URL"] || connection_string_or_opts) when nil then Hash.new when String then self.class.parse_uri(ENV["RABBITMQ_URL"] || connection_string_or_opts) when Hash then connection_string_or_opts end.merge(optz) @default_hosts_shuffle_strategy = Proc.new { |hosts| hosts.shuffle } @opts = opts @addresses = self.addresses_from(opts) @address_index = 0 @user = self.username_from(opts) @pass = self.password_from(opts) @vhost = self.vhost_from(opts) @threaded = opts.fetch(:threaded, true) log_file = opts[:log_file] || opts[:logfile] || STDOUT log_level = opts[:log_level] || ENV["BUNNY_LOG_LEVEL"] || Logger::WARN @logger = opts.fetch(:logger, init_default_logger(log_file, log_level)) (opts) # should automatic recovery from network failures be used? @automatically_recover = if opts[:automatically_recover].nil? && opts[:automatic_recovery].nil? true else opts[:automatically_recover] || opts[:automatic_recovery] end @recovery_attempts = opts[:recovery_attempts] @network_recovery_interval = opts.fetch(:network_recovery_interval, DEFAULT_NETWORK_RECOVERY_INTERVAL) @recover_from_connection_close = opts.fetch(:recover_from_connection_close, true) # in ms @continuation_timeout = opts.fetch(:continuation_timeout, DEFAULT_CONTINUATION_TIMEOUT) @status = :not_connected @blocked = false # these are negotiated with the broker during the connection tuning phase @client_frame_max = opts.fetch(:frame_max, DEFAULT_FRAME_MAX) @client_channel_max = normalize_client_channel_max(opts.fetch(:channel_max, DEFAULT_CHANNEL_MAX)) # will be-renegotiated during connection tuning steps. MK. @channel_max = @client_channel_max @client_heartbeat = self.heartbeat_from(opts) @client_properties = opts[:properties] || DEFAULT_CLIENT_PROPERTIES @mechanism = opts.fetch(:auth_mechanism, "PLAIN") @credentials_encoder = credentials_encoder_for(@mechanism) @locale = @opts.fetch(:locale, DEFAULT_LOCALE) @mutex_impl = @opts.fetch(:mutex_impl, Monitor) # mutex for the channel id => channel hash @channel_mutex = @mutex_impl.new # transport operations/continuations mutex. A workaround for # the non-reentrant Ruby mutexes. MK. @transport_mutex = @mutex_impl.new @status_mutex = @mutex_impl.new @address_index_mutex = @mutex_impl.new @channels = Hash.new @origin_thread = Thread.current self.reset_continuations self.initialize_transport end |
Instance Attribute Details
#channel_id_allocator ⇒ Object (readonly)
Returns the value of attribute channel_id_allocator.
83 84 85 |
# File 'lib/bunny/session.rb', line 83 def channel_id_allocator @channel_id_allocator end |
#channel_max ⇒ Object (readonly)
Returns the value of attribute channel_max.
81 82 83 |
# File 'lib/bunny/session.rb', line 81 def channel_max @channel_max end |
#continuation_timeout ⇒ Integer (readonly)
Returns Timeout for blocking protocol operations (queue.declare, queue.bind, etc), in milliseconds. Default is 4000.
90 91 92 |
# File 'lib/bunny/session.rb', line 90 def continuation_timeout @continuation_timeout end |
#frame_max ⇒ Object (readonly)
Returns the value of attribute frame_max.
81 82 83 |
# File 'lib/bunny/session.rb', line 81 def frame_max @frame_max end |
#heartbeat ⇒ Object (readonly)
Returns the value of attribute heartbeat.
81 82 83 |
# File 'lib/bunny/session.rb', line 81 def heartbeat @heartbeat end |
#logger ⇒ Logger (readonly)
88 89 90 |
# File 'lib/bunny/session.rb', line 88 def logger @logger end |
#mechanism ⇒ String (readonly)
Authentication mechanism, e.g. "PLAIN" or "EXTERNAL"
86 87 88 |
# File 'lib/bunny/session.rb', line 86 def mechanism @mechanism end |
#pass ⇒ Object (readonly)
Returns the value of attribute pass.
81 82 83 |
# File 'lib/bunny/session.rb', line 81 def pass @pass end |
#port ⇒ Object (readonly)
Returns the value of attribute port.
81 82 83 |
# File 'lib/bunny/session.rb', line 81 def port @port end |
#server_authentication_mechanisms ⇒ Object (readonly)
Returns the value of attribute server_authentication_mechanisms.
82 83 84 |
# File 'lib/bunny/session.rb', line 82 def server_authentication_mechanisms @server_authentication_mechanisms end |
#server_capabilities ⇒ Object (readonly)
Returns the value of attribute server_capabilities.
82 83 84 |
# File 'lib/bunny/session.rb', line 82 def server_capabilities @server_capabilities end |
#server_locales ⇒ Object (readonly)
Returns the value of attribute server_locales.
82 83 84 |
# File 'lib/bunny/session.rb', line 82 def server_locales @server_locales end |
#server_properties ⇒ Object (readonly)
Returns the value of attribute server_properties.
82 83 84 |
# File 'lib/bunny/session.rb', line 82 def server_properties @server_properties end |
#status ⇒ Object (readonly)
Returns the value of attribute status.
81 82 83 |
# File 'lib/bunny/session.rb', line 81 def status @status end |
#threaded ⇒ Object (readonly)
Returns the value of attribute threaded.
81 82 83 |
# File 'lib/bunny/session.rb', line 81 def threaded @threaded end |
#transport ⇒ Bunny::Transport (readonly)
80 81 82 |
# File 'lib/bunny/session.rb', line 80 def transport @transport end |
#user ⇒ Object (readonly)
Returns the value of attribute user.
81 82 83 |
# File 'lib/bunny/session.rb', line 81 def user @user end |
#vhost ⇒ Object (readonly)
Returns the value of attribute vhost.
81 82 83 |
# File 'lib/bunny/session.rb', line 81 def vhost @vhost end |
Class Method Details
.parse_uri(uri) ⇒ Hash
Parses an amqp[s] URI into a hash that #initialize accepts.
441 442 443 |
# File 'lib/bunny/session.rb', line 441 def self.parse_uri(uri) AMQ::Settings.parse_amqp_url(uri) end |
Instance Method Details
#automatically_recover? ⇒ Boolean
Returns true if this connection has automatic recovery from network failure enabled.
405 406 407 |
# File 'lib/bunny/session.rb', line 405 def automatically_recover? @automatically_recover end |
#blocked? ⇒ Boolean
Returns true if the connection is currently blocked by RabbitMQ because it's running low on RAM, disk space, or other resource; false otherwise.
433 434 435 |
# File 'lib/bunny/session.rb', line 433 def blocked? @blocked end |
#clean_up_and_fail_on_connection_close!(method) ⇒ Object
737 738 739 740 741 742 743 744 745 746 747 |
# File 'lib/bunny/session.rb', line 737 def clean_up_and_fail_on_connection_close!(method) @last_connection_error = instantiate_connection_level_exception(method) @continuations.push(method) clean_up_on_shutdown if threaded? @origin_thread.raise(@last_connection_error) else raise @last_connection_error end end |
#clean_up_on_shutdown ⇒ Object
749 750 751 752 753 754 755 756 757 758 759 760 761 |
# File 'lib/bunny/session.rb', line 749 def clean_up_on_shutdown begin shut_down_all_consumer_work_pools! maybe_shutdown_reader_loop maybe_shutdown_heartbeat_sender rescue ShutdownSignal => sse # no-op rescue Exception => e @logger.warn "Caught an exception when cleaning up after receiving connection.close: #{e.message}" ensure close_transport end end |
#close ⇒ Object Also known as: stop
Closes the connection. This involves closing all of its channels.
349 350 351 352 353 354 355 356 357 358 359 360 361 362 |
# File 'lib/bunny/session.rb', line 349 def close @status_mutex.synchronize { @status = :closing } ignoring_io_errors do if @transport.open? close_all_channels self.close_connection(true) end clean_up_on_shutdown end @status_mutex.synchronize { @status = :closed } end |
#closed? ⇒ Boolean
Returns true if this AMQP 0.9.1 connection is closed.
392 393 394 |
# File 'lib/bunny/session.rb', line 392 def closed? @status_mutex.synchronize { @status == :closed } end |
#closing? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns true if this AMQP 0.9.1 connection is closing.
387 388 389 |
# File 'lib/bunny/session.rb', line 387 def closing? @status_mutex.synchronize { @status == :closing } end |
#configure_socket(&block) ⇒ Object
Provides a way to fine tune the socket used by connection. Accepts a block that the socket will be yielded to.
255 256 257 258 259 |
# File 'lib/bunny/session.rb', line 255 def configure_socket(&block) raise ArgumentError, "No block provided!" if block.nil? @transport.configure_socket(&block) end |
#connecting? ⇒ Boolean
Returns true if this connection is still not fully open.
381 382 383 |
# File 'lib/bunny/session.rb', line 381 def connecting? status == :connecting end |
#create_channel(n = nil, consumer_pool_size = 1) ⇒ Bunny::Channel Also known as: channel
Opens a new channel and returns it. This method will block the calling thread until the response is received and the channel is guaranteed to be opened (this operation is very fast and inexpensive).
333 334 335 336 337 338 339 340 341 342 343 344 345 |
# File 'lib/bunny/session.rb', line 333 def create_channel(n = nil, consumer_pool_size = 1) raise ArgumentError, "channel number 0 is reserved in the protocol and cannot be used" if 0 == n @channel_mutex.synchronize do if n && (ch = @channels[n]) ch else ch = Bunny::Channel.new(self, n, ConsumerWorkPool.new(consumer_pool_size || 1)) ch.open ch end end end |
#exchange_exists?(name) ⇒ Boolean
Checks if a exchange with given name exists.
Implemented using exchange.declare with passive set to true and a one-off (short lived) channel under the hood.
473 474 475 476 477 478 479 480 481 482 483 |
# File 'lib/bunny/session.rb', line 473 def exchange_exists?(name) ch = create_channel begin ch.exchange(name, :passive => true) true rescue Bunny::NotFound => _ false ensure ch.close if ch.open? end end |
#heartbeat_disabled?(val) ⇒ Boolean (protected)
1122 1123 1124 |
# File 'lib/bunny/session.rb', line 1122 def heartbeat_disabled?(val) 0 == val || val.nil? end |
#heartbeat_interval ⇒ Integer
Returns Heartbeat interval used.
219 |
# File 'lib/bunny/session.rb', line 219 def heartbeat_interval; self.heartbeat; end |
#host ⇒ Object
238 239 240 |
# File 'lib/bunny/session.rb', line 238 def host @transport ? @transport.host : host_from_address(@addresses[@address_index]) end |
#hostname ⇒ String
Returns RabbitMQ hostname (or IP address) used.
210 |
# File 'lib/bunny/session.rb', line 210 def hostname; self.host; end |
#ignoring_io_errors(&block) ⇒ Object (protected)
1255 1256 1257 1258 1259 1260 1261 |
# File 'lib/bunny/session.rb', line 1255 def ignoring_io_errors(&block) begin block.call rescue AMQ::Protocol::EmptyResponseError, IOError, SystemCallError, Bunny::NetworkFailure => _ # ignore end end |
#inspect ⇒ Object
992 993 994 |
# File 'lib/bunny/session.rb', line 992 def inspect to_s end |
#local_port ⇒ Integer
Returns Client socket port.
262 263 264 |
# File 'lib/bunny/session.rb', line 262 def local_port @transport.local_address.ip_port end |
#normalize_client_channel_max(n) ⇒ Object (protected)
1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 |
# File 'lib/bunny/session.rb', line 1244 def normalize_client_channel_max(n) return CHANNEL_MAX_LIMIT if n > CHANNEL_MAX_LIMIT case n when 0 then CHANNEL_MAX_LIMIT else n end end |
#on_blocked {|AMQ::Protocol::Connection::Blocked| ... } ⇒ Object
Defines a callback that will be executed when RabbitMQ blocks the connection because it is running low on memory or disk space (as configured via config file and/or rabbitmqctl).
416 417 418 |
# File 'lib/bunny/session.rb', line 416 def on_blocked(&block) @block_callback = block end |
#on_unblocked(&block) ⇒ Object
Defines a callback that will be executed when RabbitMQ unblocks the connection that was previously blocked, e.g. because the memory or disk space alarm has cleared.
425 426 427 |
# File 'lib/bunny/session.rb', line 425 def on_unblocked(&block) @unblock_callback = block end |
#open? ⇒ Boolean Also known as: connected?
Returns true if this AMQP 0.9.1 connection is open.
397 398 399 400 401 |
# File 'lib/bunny/session.rb', line 397 def open? @status_mutex.synchronize do (status == :open || status == :connected || status == :connecting) && @transport.open? end end |
#password ⇒ String
Returns Password used.
214 |
# File 'lib/bunny/session.rb', line 214 def password; self.pass; end |
#queue_exists?(name) ⇒ Boolean
Checks if a queue with given name exists.
Implemented using queue.declare with passive set to true and a one-off (short lived) channel under the hood.
453 454 455 456 457 458 459 460 461 462 463 |
# File 'lib/bunny/session.rb', line 453 def queue_exists?(name) ch = create_channel begin ch.queue(name, :passive => true) true rescue Bunny::NotFound => _ false ensure ch.close if ch.open? end end |
#reset_address_index ⇒ Object
246 247 248 |
# File 'lib/bunny/session.rb', line 246 def reset_address_index @address_index_mutex.synchronize { @address_index = 0 } end |
#start ⇒ Object
Starts the connection process.
271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 |
# File 'lib/bunny/session.rb', line 271 def start return self if connected? @status_mutex.synchronize { @status = :connecting } # reset here for cases when automatic network recovery kicks in # when we were blocked. MK. @blocked = false self.reset_continuations begin begin # close existing transport if we have one, # to not leak sockets @transport.maybe_initialize_socket @transport.post_initialize_socket @transport.connect if @socket_configurator @transport.configure_socket(&@socket_configurator) end self.init_connection self.open_connection @reader_loop = nil self.start_reader_loop if threaded? rescue TCPConnectionFailed => e @logger.warn e. self.initialize_transport @logger.warn "Will try to connect to the next endpoint in line: #{@transport.host}:#{@transport.port}" return self.start rescue @status_mutex.synchronize { @status = :not_connected } raise end rescue HostListDepleted self.reset_address_index @status_mutex.synchronize { @status = :not_connected } raise TCPConnectionFailedForAllHosts end self end |
#threaded? ⇒ Boolean
Returns true if this connection uses a separate thread for I/O activity.
234 235 236 |
# File 'lib/bunny/session.rb', line 234 def threaded? @threaded end |
#to_s ⇒ String
987 988 989 990 |
# File 'lib/bunny/session.rb', line 987 def to_s oid = ("0x%x" % (self.object_id << 1)) "#<#{self.class.name}:#{oid} #{@user}@#{host}:#{port}, vhost=#{@vhost}, addresses=[#{@addresses.join(',')}]>" end |
#username ⇒ String
Returns Username used.
212 |
# File 'lib/bunny/session.rb', line 212 def username; self.user; end |
#uses_ssl? ⇒ Boolean Also known as: ssl?
Returns true if this connection uses TLS (SSL).
228 229 230 |
# File 'lib/bunny/session.rb', line 228 def uses_ssl? @transport.uses_ssl? end |
#uses_tls? ⇒ Boolean Also known as: tls?
Returns true if this connection uses TLS (SSL).
222 223 224 |
# File 'lib/bunny/session.rb', line 222 def uses_tls? @transport.uses_tls? end |
#validate_connection_options(options) ⇒ Object
199 200 201 202 203 204 205 206 207 |
# File 'lib/bunny/session.rb', line 199 def () if [:hosts] && [:addresses] raise ArgumentError, "Connection options can't contain hosts and addresses at the same time" end if ([:host] || [:hostname]) && ([:hosts] || [:addresses]) @logger.warn "The connection options contain both a host and an array of hosts, the single host is ignored." end end |
#virtual_host ⇒ String
Returns Virtual host used.
216 |
# File 'lib/bunny/session.rb', line 216 def virtual_host; self.vhost; end |
#with_channel(n = nil) ⇒ Bunny::Session
Creates a temporary channel, yields it to the block given to this method and closes it.
369 370 371 372 373 374 375 376 377 378 |
# File 'lib/bunny/session.rb', line 369 def with_channel(n = nil) ch = create_channel(n) begin yield ch ensure ch.close if ch.open? end self end |