Class: GorgonBunny::Session
- Inherits:
-
Object
- Object
- GorgonBunny::Session
- Defined in:
- lib/gorgon_bunny/lib/gorgon_bunny/session.rb
Overview
Represents AMQP 0.9.1 connection (to a RabbitMQ node).
Constant Summary collapse
- DEFAULT_HOST =
Default host used for connection
"127.0.0.1"
- DEFAULT_VHOST =
Default virtual host used for connection
"/"
- DEFAULT_USER =
Default username used for connection
"guest"
- DEFAULT_PASSWORD =
Default password used for connection
"guest"
- DEFAULT_HEARTBEAT =
Default heartbeat interval, the same value as RabbitMQ 3.0 uses.
:server
- DEFAULT_FRAME_MAX =
131072
- CONNECT_TIMEOUT =
backwards compatibility
Transport::DEFAULT_CONNECTION_TIMEOUT
- DEFAULT_CONTINUATION_TIMEOUT =
if RUBY_VERSION.to_f < 1.9 8000 else 15000 end
- DEFAULT_CLIENT_PROPERTIES =
RabbitMQ client metadata
{ :capabilities => { :publisher_confirms => true, :consumer_cancel_notify => true, :exchange_exchange_bindings => true, :"basic.nack" => true, :"connection.blocked" => true, # See http://www.rabbitmq.com/auth-notification.html :authentication_failure_close => true }, :product => "GorgonBunny", :platform => ::RUBY_DESCRIPTION, :version => GorgonBunny::VERSION, :information => "http://rubybunny.info", }
- DEFAULT_LOCALE =
"en_GB"
- DEFAULT_NETWORK_RECOVERY_INTERVAL =
Default reconnection interval for TCP connection failures
5.0
Instance Attribute Summary collapse
-
#channel_id_allocator ⇒ Object
readonly
Returns the value of attribute channel_id_allocator.
-
#continuation_timeout ⇒ Integer
readonly
Timeout for blocking protocol operations (queue.declare, queue.bind, etc), in milliseconds.
-
#default_channel ⇒ Object
readonly
Returns the value of attribute default_channel.
-
#frame_max ⇒ Object
readonly
Returns the value of attribute frame_max.
-
#heartbeat ⇒ Object
readonly
Returns the value of attribute heartbeat.
-
#host ⇒ Object
readonly
Returns the value of attribute host.
- #logger ⇒ Logger readonly
-
#mechanism ⇒ String
readonly
Authentication mechanism, e.g.
- #mutex_impl ⇒ Object readonly
-
#pass ⇒ Object
readonly
Returns the value of attribute pass.
-
#port ⇒ Object
readonly
Returns the value of attribute port.
-
#server_authentication_mechanisms ⇒ Object
readonly
Returns the value of attribute server_authentication_mechanisms.
-
#server_capabilities ⇒ Object
readonly
Returns the value of attribute server_capabilities.
-
#server_locales ⇒ Object
readonly
Returns the value of attribute server_locales.
-
#server_properties ⇒ Object
readonly
Returns the value of attribute server_properties.
-
#status ⇒ Object
readonly
Returns the value of attribute status.
-
#threaded ⇒ Object
readonly
Returns the value of attribute threaded.
- #transport ⇒ GorgonBunny::Transport readonly
-
#user ⇒ Object
readonly
Returns the value of attribute user.
-
#vhost ⇒ Object
readonly
Returns the value of attribute vhost.
Class Method Summary collapse
-
.parse_uri(uri) ⇒ Hash
Parses an amqp URI into a hash that #initialize accepts.
Instance Method Summary collapse
-
#automatically_recover? ⇒ Boolean
True if this connection has automatic recovery from network failure enabled.
-
#blocked? ⇒ Boolean
True if the connection is currently blocked by RabbitMQ because it’s running low on RAM, disk space, or other resource; false otherwise.
-
#close ⇒ Object
(also: #stop)
Closes the connection.
- #close_all_channels ⇒ Object
- #close_channel(ch) ⇒ Object
- #close_connection(sync = true) ⇒ Object
- #close_transport ⇒ Object
-
#closed? ⇒ Boolean
True if this AMQP 0.9.1 connection is closed.
-
#configure_socket(&block) ⇒ Object
Provides a way to fine tune the socket used by connection.
-
#connecting? ⇒ Boolean
True if this connection is still not fully open.
-
#create_channel(n = nil, consumer_pool_size = 1) ⇒ GorgonBunny::Channel
(also: #channel)
Opens a new channel and returns it.
- #direct(*args) ⇒ Object
- #exchange(*args) ⇒ Object
-
#exchange_exists?(name) ⇒ Boolean
Checks if a exchange with given name exists.
- #fanout(*args) ⇒ Object
-
#handle_frame(ch_number, method) ⇒ Object
Handles incoming frames and dispatches them.
- #handle_frameset(ch_number, frames) ⇒ Object
- #handle_network_failure(exception) ⇒ Object
- #headers(*args) ⇒ Object
- #heartbeat_from(options) ⇒ Object
-
#heartbeat_interval ⇒ Integer
Heartbeat interval used.
-
#hostname ⇒ String
RabbitMQ hostname (or IP address) used.
- #hostname_from(options) ⇒ Object
-
#initialize(connection_string_or_opts = Hash.new, optz = Hash.new) ⇒ Session
constructor
A new instance of Session.
- #instantiate_connection_level_exception(frame) ⇒ Object
- #maybe_shutdown_reader_loop ⇒ Object
- #next_channel_id ⇒ Object
-
#on_blocked {|GorgonAMQ::Protocol::Connection::Blocked| ... } ⇒ Object
Defines a callback that will be executed when RabbitMQ blocks the connection because it is running low on memory or disk space (as configured via config file and/or rabbitmqctl).
-
#on_unblocked(&block) ⇒ Object
Defines a callback that will be executed when RabbitMQ unblocks the connection that was previously blocked, e.g.
-
#open? ⇒ Boolean
(also: #connected?)
True if this AMQP 0.9.1 connection is open.
- #open_channel(ch) ⇒ Object
-
#password ⇒ String
Password used.
- #password_from(options) ⇒ Object
- #port_from(options) ⇒ Object
- #queue(*args) ⇒ Object
-
#queue_exists?(name) ⇒ Boolean
Checks if a queue with given name exists.
- #raise_if_continuation_resulted_in_a_connection_error! ⇒ Object
-
#read_write_timeout ⇒ Integer
Socket operation timeout used by this connection.
- #reader_loop ⇒ Object
- #recover_channels ⇒ Object
- #recover_from_network_failure ⇒ Object
- #recoverable_network_failure?(exception) ⇒ Boolean
- #recovering_from_network_failure? ⇒ Boolean
- #register_channel(ch) ⇒ Object
- #release_channel_id(i) ⇒ Object
-
#send_frame(frame, signal_activity = true) ⇒ Object
Sends frame to the peer, checking that connection is open.
-
#send_frame_without_timeout(frame, signal_activity = true) ⇒ Object
Sends frame to the peer, checking that connection is open.
-
#send_frameset(frames, channel) ⇒ Object
Sends multiple frames, one by one.
-
#send_frameset_without_timeout(frames, channel) ⇒ Object
Sends multiple frames, one by one.
- #send_raw_without_timeout(data, channel) ⇒ Object
- #signal_activity! ⇒ Object
-
#start ⇒ Object
Starts the connection process.
- #start_reader_loop ⇒ Object
-
#threaded? ⇒ Boolean
True if this connection uses a separate thread for I/O activity.
- #to_s ⇒ String
- #topic(*args) ⇒ Object
- #unregister_channel(ch) ⇒ Object
-
#username ⇒ String
Username used.
- #username_from(options) ⇒ Object
-
#uses_ssl? ⇒ Boolean
(also: #ssl?)
True if this connection uses TLS (SSL).
-
#uses_tls? ⇒ Boolean
(also: #tls?)
True if this connection uses TLS (SSL).
- #vhost_from(options) ⇒ Object
-
#virtual_host ⇒ String
Virtual host used.
-
#with_channel(n = nil) {|ch| ... } ⇒ GorgonBunny::Session
Creates a temporary channel, yields it to the block given to this method and closes it.
Constructor Details
#initialize(connection_string_or_opts = Hash.new, optz = Hash.new) ⇒ Session
Returns a new instance of Session.
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 116 def initialize(connection_string_or_opts = Hash.new, optz = Hash.new) opts = case (ENV["RABBITMQ_URL"] || connection_string_or_opts) when nil then Hash.new when String then self.class.parse_uri(ENV["RABBITMQ_URL"] || connection_string_or_opts) when Hash then connection_string_or_opts end.merge(optz) @opts = opts @host = self.hostname_from(opts) @port = self.port_from(opts) @user = self.username_from(opts) @pass = self.password_from(opts) @vhost = self.vhost_from(opts) @logfile = opts[:log_file] || opts[:logfile] || STDOUT @threaded = opts.fetch(:threaded, true) self.init_logger(opts[:log_level] || ENV["BUNNY_LOG_LEVEL"] || Logger::WARN) # should automatic recovery from network failures be used? @automatically_recover = if opts[:automatically_recover].nil? && opts[:automatic_recovery].nil? true else opts[:automatically_recover] || opts[:automatic_recovery] end @network_recovery_interval = opts.fetch(:network_recovery_interval, DEFAULT_NETWORK_RECOVERY_INTERVAL) # in ms @continuation_timeout = opts.fetch(:continuation_timeout, DEFAULT_CONTINUATION_TIMEOUT) @status = :not_connected @blocked = false # these are negotiated with the broker during the connection tuning phase @client_frame_max = opts.fetch(:frame_max, DEFAULT_FRAME_MAX) @client_channel_max = opts.fetch(:channel_max, 65536) @client_heartbeat = self.heartbeat_from(opts) @client_properties = opts[:properties] || DEFAULT_CLIENT_PROPERTIES @mechanism = opts.fetch(:auth_mechanism, "PLAIN") @credentials_encoder = credentials_encoder_for(@mechanism) @locale = @opts.fetch(:locale, DEFAULT_LOCALE) @mutex_impl = @opts.fetch(:mutex_impl, Monitor) # mutex for the channel id => channel hash @channel_mutex = @mutex_impl.new # transport operations/continuations mutex. A workaround for # the non-reentrant Ruby mutexes. MK. @transport_mutex = @mutex_impl.new @channels = Hash.new @origin_thread = Thread.current self.reset_continuations self.initialize_transport end |
Instance Attribute Details
#channel_id_allocator ⇒ Object (readonly)
Returns the value of attribute channel_id_allocator.
84 85 86 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 84 def channel_id_allocator @channel_id_allocator end |
#continuation_timeout ⇒ Integer (readonly)
Returns Timeout for blocking protocol operations (queue.declare, queue.bind, etc), in milliseconds. Default is 15000.
91 92 93 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 91 def continuation_timeout @continuation_timeout end |
#default_channel ⇒ Object (readonly)
Returns the value of attribute default_channel.
83 84 85 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 83 def default_channel @default_channel end |
#frame_max ⇒ Object (readonly)
Returns the value of attribute frame_max.
81 82 83 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 81 def frame_max @frame_max end |
#heartbeat ⇒ Object (readonly)
Returns the value of attribute heartbeat.
81 82 83 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 81 def heartbeat @heartbeat end |
#host ⇒ Object (readonly)
Returns the value of attribute host.
81 82 83 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 81 def host @host end |
#logger ⇒ Logger (readonly)
89 90 91 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 89 def logger @logger end |
#mechanism ⇒ String (readonly)
Authentication mechanism, e.g. “PLAIN” or “EXTERNAL”
87 88 89 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 87 def mechanism @mechanism end |
#mutex_impl ⇒ Object (readonly)
205 206 207 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 205 def mutex_impl @mutex_impl end |
#pass ⇒ Object (readonly)
Returns the value of attribute pass.
81 82 83 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 81 def pass @pass end |
#port ⇒ Object (readonly)
Returns the value of attribute port.
81 82 83 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 81 def port @port end |
#server_authentication_mechanisms ⇒ Object (readonly)
Returns the value of attribute server_authentication_mechanisms.
82 83 84 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 82 def server_authentication_mechanisms @server_authentication_mechanisms end |
#server_capabilities ⇒ Object (readonly)
Returns the value of attribute server_capabilities.
82 83 84 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 82 def server_capabilities @server_capabilities end |
#server_locales ⇒ Object (readonly)
Returns the value of attribute server_locales.
82 83 84 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 82 def server_locales @server_locales end |
#server_properties ⇒ Object (readonly)
Returns the value of attribute server_properties.
82 83 84 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 82 def server_properties @server_properties end |
#status ⇒ Object (readonly)
Returns the value of attribute status.
81 82 83 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 81 def status @status end |
#threaded ⇒ Object (readonly)
Returns the value of attribute threaded.
81 82 83 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 81 def threaded @threaded end |
#transport ⇒ GorgonBunny::Transport (readonly)
80 81 82 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 80 def transport @transport end |
#user ⇒ Object (readonly)
Returns the value of attribute user.
81 82 83 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 81 def user @user end |
#vhost ⇒ Object (readonly)
Returns the value of attribute vhost.
81 82 83 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 81 def vhost @vhost end |
Class Method Details
.parse_uri(uri) ⇒ Hash
Parses an amqp URI into a hash that #initialize accepts.
395 396 397 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 395 def self.parse_uri(uri) GorgonAMQ::Settings.parse_amqp_url(uri) end |
Instance Method Details
#automatically_recover? ⇒ Boolean
Returns true if this connection has automatic recovery from network failure enabled.
325 326 327 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 325 def automatically_recover? @automatically_recover end |
#blocked? ⇒ Boolean
Returns true if the connection is currently blocked by RabbitMQ because it’s running low on RAM, disk space, or other resource; false otherwise.
387 388 389 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 387 def blocked? @blocked end |
#close ⇒ Object Also known as: stop
Closes the connection. This involves closing all of its channels.
280 281 282 283 284 285 286 287 288 289 290 291 292 293 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 280 def close if @transport.open? close_all_channels GorgonBunny::Timeout.timeout(@transport.disconnect_timeout, ClientTimeout) do self.close_connection(true) end maybe_shutdown_reader_loop close_transport @status = :closed end end |
#close_all_channels ⇒ Object
471 472 473 474 475 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 471 def close_all_channels @channels.reject {|n, ch| n == 0 || !ch.open? }.each do |_, ch| GorgonBunny::Timeout.timeout(@transport.disconnect_timeout, ClientTimeout) { ch.close } end end |
#close_channel(ch) ⇒ Object
459 460 461 462 463 464 465 466 467 468 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 459 def close_channel(ch) n = ch.number @transport.send_frame(GorgonAMQ::Protocol::Channel::Close.encode(n, 200, "Goodbye", 0, 0)) @last_channel_close_ok = wait_on_continuations raise_if_continuation_resulted_in_a_connection_error! self.unregister_channel(ch) @last_channel_close_ok end |
#close_connection(sync = true) ⇒ Object
478 479 480 481 482 483 484 485 486 487 488 489 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 478 def close_connection(sync = true) if @transport.open? @transport.send_frame(GorgonAMQ::Protocol::Connection::Close.encode(200, "Goodbye", 0, 0)) maybe_shutdown_heartbeat_sender @status = :not_connected if sync @last_connection_close_ok = wait_on_continuations end end end |
#close_transport ⇒ Object
759 760 761 762 763 764 765 766 767 768 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 759 def close_transport begin @transport.close rescue StandardError => e @logger.error "Exception when closing transport:" @logger.error e.class.name @logger.error e. @logger.error e.backtrace end end |
#closed? ⇒ Boolean
Returns true if this AMQP 0.9.1 connection is closed.
314 315 316 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 314 def closed? status == :closed end |
#configure_socket(&block) ⇒ Object
Provides a way to fine tune the socket used by connection. Accepts a block that the socket will be yielded to.
209 210 211 212 213 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 209 def configure_socket(&block) raise ArgumentError, "No block provided!" if block.nil? @transport.configure_socket(&block) end |
#connecting? ⇒ Boolean
Returns true if this connection is still not fully open.
309 310 311 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 309 def connecting? status == :connecting end |
#create_channel(n = nil, consumer_pool_size = 1) ⇒ GorgonBunny::Channel Also known as: channel
Opens a new channel and returns it. This method will block the calling thread until the response is received and the channel is guaranteed to be opened (this operation is very fast and inexpensive).
268 269 270 271 272 273 274 275 276 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 268 def create_channel(n = nil, consumer_pool_size = 1) if n && (ch = @channels[n]) ch else ch = GorgonBunny::Channel.new(self, n, ConsumerWorkPool.new(consumer_pool_size || 1)) ch.open ch end end |
#direct(*args) ⇒ Object
339 340 341 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 339 def direct(*args) @default_channel.direct(*args) end |
#exchange(*args) ⇒ Object
359 360 361 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 359 def exchange(*args) @default_channel.exchange(*args) end |
#exchange_exists?(name) ⇒ Boolean
Checks if a exchange with given name exists.
Implemented using exchange.declare with passive set to true and a one-off (short lived) channel under the hood.
427 428 429 430 431 432 433 434 435 436 437 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 427 def exchange_exists?(name) ch = create_channel begin ch.exchange(name, :passive => true) true rescue GorgonBunny::NotFound => _ false ensure ch.close if ch.open? end end |
#fanout(*args) ⇒ Object
344 345 346 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 344 def fanout(*args) @default_channel.fanout(*args) end |
#handle_frame(ch_number, method) ⇒ Object
Handles incoming frames and dispatches them.
Channel methods (‘channel.open-ok`, `channel.close-ok`) are handled by the session itself. Connection level errors result in exceptions being raised. Deliveries and other methods are passed on to channels to dispatch.
499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 499 def handle_frame(ch_number, method) @logger.debug "Session#handle_frame on #{ch_number}: #{method.inspect}" case method when GorgonAMQ::Protocol::Channel::OpenOk then @continuations.push(method) when GorgonAMQ::Protocol::Channel::CloseOk then @continuations.push(method) when GorgonAMQ::Protocol::Connection::Close then @last_connection_error = instantiate_connection_level_exception(method) @continuations.push(method) @origin_thread.raise(@last_connection_error) when GorgonAMQ::Protocol::Connection::CloseOk then @last_connection_close_ok = method begin @continuations.clear rescue StandardError => e @logger.error e.class.name @logger.error e. @logger.error e.backtrace ensure @continuations.push(:__unblock__) end when GorgonAMQ::Protocol::Connection::Blocked then @blocked = true @block_callback.call(method) if @block_callback when GorgonAMQ::Protocol::Connection::Unblocked then @blocked = false @unblock_callback.call(method) if @unblock_callback when GorgonAMQ::Protocol::Channel::Close then begin ch = @channels[ch_number] ch.handle_method(method) ensure self.unregister_channel(ch) end when GorgonAMQ::Protocol::Basic::GetEmpty then @channels[ch_number].handle_basic_get_empty(method) else if ch = @channels[ch_number] ch.handle_method(method) else @logger.warn "Channel #{ch_number} is not open on this connection!" end end end |
#handle_frameset(ch_number, frames) ⇒ Object
552 553 554 555 556 557 558 559 560 561 562 563 564 565 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 552 def handle_frameset(ch_number, frames) method = frames.first case method when GorgonAMQ::Protocol::Basic::GetOk then @channels[ch_number].handle_basic_get_ok(*frames) when GorgonAMQ::Protocol::Basic::GetEmpty then @channels[ch_number].handle_basic_get_empty(*frames) when GorgonAMQ::Protocol::Basic::Return then @channels[ch_number].handle_basic_return(*frames) else @channels[ch_number].handle_frameset(*frames) end end |
#handle_network_failure(exception) ⇒ Object
568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 568 def handle_network_failure(exception) raise NetworkErrorWrapper.new(exception) unless @threaded @status = :disconnected if !recovering_from_network_failure? @recovering_from_network_failure = true if recoverable_network_failure?(exception) @logger.warn "Recovering from a network failure..." @channels.each do |n, ch| ch.maybe_kill_consumer_work_pool! end maybe_shutdown_heartbeat_sender recover_from_network_failure else # TODO: investigate if we can be a bit smarter here. MK. end end end |
#headers(*args) ⇒ Object
354 355 356 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 354 def headers(*args) @default_channel.headers(*args) end |
#heartbeat_from(options) ⇒ Object
691 692 693 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 691 def heartbeat_from() [:heartbeat] || [:heartbeat_interval] || [:requested_heartbeat] || DEFAULT_HEARTBEAT end |
#heartbeat_interval ⇒ Integer
Returns Heartbeat interval used.
185 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 185 def heartbeat_interval; self.heartbeat; end |
#hostname ⇒ String
Returns RabbitMQ hostname (or IP address) used.
176 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 176 def hostname; self.host; end |
#hostname_from(options) ⇒ Object
660 661 662 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 660 def hostname_from() [:host] || [:hostname] || DEFAULT_HOST end |
#instantiate_connection_level_exception(frame) ⇒ Object
633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 633 def instantiate_connection_level_exception(frame) case frame when GorgonAMQ::Protocol::Connection::Close then klass = case frame.reply_code when 320 then ConnectionForced when 501 then FrameError when 503 then CommandInvalid when 504 then ChannelError when 505 then UnexpectedFrame when 506 then ResourceError when 541 then InternalError else raise "Unknown reply code: #{frame.reply_code}, text: #{frame.reply_text}" end klass.new("Connection-level error: #{frame.reply_text}", self, frame) end end |
#maybe_shutdown_reader_loop ⇒ Object
733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 733 def maybe_shutdown_reader_loop if @reader_loop @reader_loop.stop if threaded? # this is the easiest way to wait until the loop # is guaranteed to have terminated @reader_loop.raise(ShutdownSignal) # joining the thread here may take forever # on JRuby because sun.nio.ch.KQueueArrayWrapper#kevent0 is # a native method that cannot be (easily) interrupted. # So we use this ugly hack or else our test suite takes forever # to run on JRuby (a new connection is opened/closed per example). MK. if defined?(JRUBY_VERSION) sleep 0.075 else @reader_loop.join end else # single threaded mode, nothing to do. MK. end end @reader_loop = nil end |
#next_channel_id ⇒ Object
696 697 698 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 696 def next_channel_id @channel_id_allocator.next_channel_id end |
#on_blocked {|GorgonAMQ::Protocol::Connection::Blocked| ... } ⇒ Object
Defines a callback that will be executed when RabbitMQ blocks the connection because it is running low on memory or disk space (as configured via config file and/or rabbitmqctl).
370 371 372 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 370 def on_blocked(&block) @block_callback = block end |
#on_unblocked(&block) ⇒ Object
Defines a callback that will be executed when RabbitMQ unblocks the connection that was previously blocked, e.g. because the memory or disk space alarm has cleared.
379 380 381 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 379 def on_unblocked(&block) @unblock_callback = block end |
#open? ⇒ Boolean Also known as: connected?
Returns true if this AMQP 0.9.1 connection is open.
319 320 321 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 319 def open? (status == :open || status == :connected || status == :connecting) && @transport.open? end |
#open_channel(ch) ⇒ Object
445 446 447 448 449 450 451 452 453 454 455 456 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 445 def open_channel(ch) n = ch.number self.register_channel(ch) @transport_mutex.synchronize do @transport.send_frame(GorgonAMQ::Protocol::Channel::Open.encode(n, GorgonAMQ::Protocol::EMPTY_STRING)) end @last_channel_open_ok = wait_on_continuations raise_if_continuation_resulted_in_a_connection_error! @last_channel_open_ok end |
#password ⇒ String
Returns Password used.
180 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 180 def password; self.pass; end |
#password_from(options) ⇒ Object
686 687 688 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 686 def password_from() [:password] || [:pass] || [:pwd] || DEFAULT_PASSWORD end |
#port_from(options) ⇒ Object
665 666 667 668 669 670 671 672 673 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 665 def port_from() fallback = if [:tls] || [:ssl] GorgonAMQ::Protocol::TLS_PORT else GorgonAMQ::Protocol::DEFAULT_PORT end .fetch(:port, fallback) end |
#queue(*args) ⇒ Object
334 335 336 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 334 def queue(*args) @default_channel.queue(*args) end |
#queue_exists?(name) ⇒ Boolean
Checks if a queue with given name exists.
Implemented using queue.declare with passive set to true and a one-off (short lived) channel under the hood.
407 408 409 410 411 412 413 414 415 416 417 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 407 def queue_exists?(name) ch = create_channel begin ch.queue(name, :passive => true) true rescue GorgonBunny::NotFound => _ false ensure ch.close if ch.open? end end |
#raise_if_continuation_resulted_in_a_connection_error! ⇒ Object
547 548 549 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 547 def raise_if_continuation_resulted_in_a_connection_error! raise @last_connection_error if @last_connection_error end |
#read_write_timeout ⇒ Integer
Socket operation timeout used by this connection
259 260 261 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 259 def read_write_timeout @transport.read_write_timeout end |
#reader_loop ⇒ Object
728 729 730 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 728 def reader_loop @reader_loop ||= ReaderLoop.new(@transport, self, Thread.current) end |
#recover_channels ⇒ Object
621 622 623 624 625 626 627 628 629 630 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 621 def recover_channels # default channel is reopened right after connection # negotiation is completed, so make sure we do not try to open # it twice. MK. @channels.reject { |n, ch| ch == @default_channel }.each do |n, ch| ch.open ch.recover_from_network_failure end end |
#recover_from_network_failure ⇒ Object
601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 601 def recover_from_network_failure begin sleep @network_recovery_interval @logger.debug "About to start connection recovery..." self.initialize_transport self.start if open? @recovering_from_network_failure = false recover_channels end rescue TCPConnectionFailed, GorgonAMQ::Protocol::EmptyResponseError => e @logger.warn "TCP connection failed, reconnecting in 5 seconds" sleep @network_recovery_interval retry if recoverable_network_failure?(e) end end |
#recoverable_network_failure?(exception) ⇒ Boolean
590 591 592 593 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 590 def recoverable_network_failure?(exception) # TODO: investigate if we can be a bit smarter here. MK. true end |
#recovering_from_network_failure? ⇒ Boolean
596 597 598 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 596 def recovering_from_network_failure? @recovering_from_network_failure end |
#register_channel(ch) ⇒ Object
706 707 708 709 710 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 706 def register_channel(ch) @channel_mutex.synchronize do @channels[ch.number] = ch end end |
#release_channel_id(i) ⇒ Object
701 702 703 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 701 def release_channel_id(i) @channel_id_allocator.release_channel_id(i) end |
#send_frame(frame, signal_activity = true) ⇒ Object
Sends frame to the peer, checking that connection is open. Exposed primarily for GorgonBunny::Channel
781 782 783 784 785 786 787 788 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 781 def send_frame(frame, signal_activity = true) if open? @transport.write(frame.encode) signal_activity! if signal_activity else raise ConnectionClosedError.new(frame) end end |
#send_frame_without_timeout(frame, signal_activity = true) ⇒ Object
Sends frame to the peer, checking that connection is open. Uses transport implementation that does not perform timeout control. Exposed primarily for GorgonBunny::Channel.
796 797 798 799 800 801 802 803 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 796 def send_frame_without_timeout(frame, signal_activity = true) if open? @transport.write_without_timeout(frame.encode) signal_activity! if signal_activity else raise ConnectionClosedError.new(frame) end end |
#send_frameset(frames, channel) ⇒ Object
Sends multiple frames, one by one. For thread safety this method takes a channel object and synchronizes on it.
809 810 811 812 813 814 815 816 817 818 819 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 809 def send_frameset(frames, channel) # some developers end up sharing channels between threads and when multiple # threads publish on the same channel aggressively, at some point frames will be # delivered out of order and broker will raise 505 UNEXPECTED_FRAME exception. # If we synchronize on the channel, however, this is both thread safe and pretty fine-grained # locking. Note that "single frame" methods do not need this kind of synchronization. MK. channel.synchronize do frames.each { |frame| self.send_frame(frame, false) } signal_activity! end end |
#send_frameset_without_timeout(frames, channel) ⇒ Object
Sends multiple frames, one by one. For thread safety this method takes a channel object and synchronizes on it. Uses transport implementation that does not perform timeout control.
826 827 828 829 830 831 832 833 834 835 836 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 826 def send_frameset_without_timeout(frames, channel) # some developers end up sharing channels between threads and when multiple # threads publish on the same channel aggressively, at some point frames will be # delivered out of order and broker will raise 505 UNEXPECTED_FRAME exception. # If we synchronize on the channel, however, this is both thread safe and pretty fine-grained # locking. Note that "single frame" methods do not need this kind of synchronization. MK. channel.synchronize do frames.each { |frame| self.send_frame_without_timeout(frame, false) } signal_activity! end end |
#send_raw_without_timeout(data, channel) ⇒ Object
839 840 841 842 843 844 845 846 847 848 849 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 839 def send_raw_without_timeout(data, channel) # some developers end up sharing channels between threads and when multiple # threads publish on the same channel aggressively, at some point frames will be # delivered out of order and broker will raise 505 UNEXPECTED_FRAME exception. # If we synchronize on the channel, however, this is both thread safe and pretty fine-grained # locking. Note that "single frame" methods do not need this kind of synchronization. MK. channel.synchronize do @transport.write(data) signal_activity! end end |
#signal_activity! ⇒ Object
771 772 773 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 771 def signal_activity! @heartbeat_sender.signal_activity! if @heartbeat_sender end |
#start ⇒ Object
Starts the connection process.
220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 220 def start return self if connected? @status = :connecting # reset here for cases when automatic network recovery kicks in # when we were blocked. MK. @blocked = false self.reset_continuations begin # close existing transport if we have one, # to not leak sockets @transport.maybe_initialize_socket @transport.post_initialize_socket @transport.connect if @socket_configurator @transport.configure_socket(&@socket_configurator) end self.init_connection self.open_connection @reader_loop = nil self.start_reader_loop if threaded? @default_channel = self.create_channel rescue Exception => e @status = :not_connected raise e end self end |
#start_reader_loop ⇒ Object
723 724 725 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 723 def start_reader_loop reader_loop.start end |
#threaded? ⇒ Boolean
Returns true if this connection uses a separate thread for I/O activity.
200 201 202 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 200 def threaded? @threaded end |
#to_s ⇒ String
853 854 855 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 853 def to_s "#<#{self.class.name}:#{object_id} #{@user}@#{@host}:#{@port}, vhost=#{@vhost}>" end |
#topic(*args) ⇒ Object
349 350 351 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 349 def topic(*args) @default_channel.topic(*args) end |
#unregister_channel(ch) ⇒ Object
713 714 715 716 717 718 719 720 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 713 def unregister_channel(ch) @channel_mutex.synchronize do n = ch.number self.release_channel_id(n) @channels.delete(ch.number) end end |
#username ⇒ String
Returns Username used.
178 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 178 def username; self.user; end |
#username_from(options) ⇒ Object
681 682 683 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 681 def username_from() [:username] || [:user] || DEFAULT_USER end |
#uses_ssl? ⇒ Boolean Also known as: ssl?
Returns true if this connection uses TLS (SSL).
194 195 196 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 194 def uses_ssl? @transport.uses_ssl? end |
#uses_tls? ⇒ Boolean Also known as: tls?
Returns true if this connection uses TLS (SSL).
188 189 190 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 188 def uses_tls? @transport.uses_tls? end |
#vhost_from(options) ⇒ Object
676 677 678 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 676 def vhost_from() [:virtual_host] || [:vhost] || DEFAULT_VHOST end |
#virtual_host ⇒ String
Returns Virtual host used.
182 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 182 def virtual_host; self.vhost; end |
#with_channel(n = nil) {|ch| ... } ⇒ GorgonBunny::Session
Creates a temporary channel, yields it to the block given to this method and closes it.
300 301 302 303 304 305 306 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 300 def with_channel(n = nil) ch = create_channel(n) yield ch ch.close if ch.open? self end |