Class: GorgonBunny::Session

Inherits:
Object
  • Object
show all
Defined in:
lib/gorgon_bunny/lib/gorgon_bunny/session.rb

Overview

Represents AMQP 0.9.1 connection (to a RabbitMQ node).

Constant Summary collapse

DEFAULT_HOST =

Default host used for connection

"127.0.0.1"
DEFAULT_VHOST =

Default virtual host used for connection

"/"
DEFAULT_USER =

Default username used for connection

"guest"
DEFAULT_PASSWORD =

Default password used for connection

"guest"
DEFAULT_HEARTBEAT =

Default heartbeat interval, the same value as RabbitMQ 3.0 uses.

:server
DEFAULT_FRAME_MAX =
131072
CONNECT_TIMEOUT =

backwards compatibility

Transport::DEFAULT_CONNECTION_TIMEOUT
DEFAULT_CONTINUATION_TIMEOUT =
if RUBY_VERSION.to_f < 1.9
  8000
else
  15000
end
DEFAULT_CLIENT_PROPERTIES =

RabbitMQ client metadata

{
  :capabilities => {
    :publisher_confirms           => true,
    :consumer_cancel_notify       => true,
    :exchange_exchange_bindings   => true,
    :"basic.nack"                 => true,
    :"connection.blocked"         => true,
    # See http://www.rabbitmq.com/auth-notification.html
    :authentication_failure_close => true
  },
  :product      => "GorgonBunny",
  :platform     => ::RUBY_DESCRIPTION,
  :version      => GorgonBunny::VERSION,
  :information  => "http://rubybunny.info",
}
DEFAULT_LOCALE =
"en_GB"
DEFAULT_NETWORK_RECOVERY_INTERVAL =

Default reconnection interval for TCP connection failures

5.0

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection_string_or_opts = Hash.new, optz = Hash.new) ⇒ Session

Returns a new instance of Session.

Parameters:

  • connection_string_or_opts (String, Hash) (defaults to: Hash.new)

    Connection string or a hash of connection options

  • optz (Hash) (defaults to: Hash.new)

    Extra options not related to connection

Options Hash (connection_string_or_opts):

  • :host (String) — default: "127.0.0.1"

    Hostname or IP address to connect to

  • :port (Integer) — default: 5672

    Port RabbitMQ listens on

  • :username (String) — default: "guest"

    Username

  • :password (String) — default: "guest"

    Password

  • :vhost (String) — default: "/"

    Virtual host to use

  • :heartbeat (Integer) — default: 600

    Heartbeat interval. 0 means no heartbeat.

  • :network_recovery_interval (Integer) — default: 4

    Recovery interval periodic network recovery will use. This includes initial pause after network failure.

  • :tls (Boolean) — default: false

    Should TLS/SSL be used?

  • :tls_cert (String) — default: nil

    Path to client TLS/SSL certificate file (.pem)

  • :tls_key (String) — default: nil

    Path to client TLS/SSL private key file (.pem)

  • :tls_ca_certificates (Array<String>)

    Array of paths to TLS/SSL CA files (.pem), by default detected from OpenSSL configuration

  • :continuation_timeout (Integer) — default: 15000

    Timeout for client operations that expect a response (e.g. Queue#get), in milliseconds.

Options Hash (optz):

  • :auth_mechanism (String) — default: "PLAIN"

    Authentication mechanism, PLAIN or EXTERNAL

  • :locale (String) — default: "PLAIN"

    Locale RabbitMQ should use

See Also:



116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 116

def initialize(connection_string_or_opts = Hash.new, optz = Hash.new)
  opts = case (ENV["RABBITMQ_URL"] || connection_string_or_opts)
         when nil then
           Hash.new
         when String then
           self.class.parse_uri(ENV["RABBITMQ_URL"] || connection_string_or_opts)
         when Hash then
           connection_string_or_opts
         end.merge(optz)

  @opts            = opts
  @host            = self.hostname_from(opts)
  @port            = self.port_from(opts)
  @user            = self.username_from(opts)
  @pass            = self.password_from(opts)
  @vhost           = self.vhost_from(opts)
  @logfile         = opts[:log_file] || opts[:logfile] || STDOUT
  @threaded        = opts.fetch(:threaded, true)

  self.init_logger(opts[:log_level] || ENV["BUNNY_LOG_LEVEL"] || Logger::WARN)

  # should automatic recovery from network failures be used?
  @automatically_recover = if opts[:automatically_recover].nil? && opts[:automatic_recovery].nil?
                             true
                           else
                             opts[:automatically_recover] || opts[:automatic_recovery]
                           end
  @network_recovery_interval = opts.fetch(:network_recovery_interval, DEFAULT_NETWORK_RECOVERY_INTERVAL)
  # in ms
  @continuation_timeout      = opts.fetch(:continuation_timeout, DEFAULT_CONTINUATION_TIMEOUT)

  @status             = :not_connected
  @blocked            = false

  # these are negotiated with the broker during the connection tuning phase
  @client_frame_max   = opts.fetch(:frame_max, DEFAULT_FRAME_MAX)
  @client_channel_max = opts.fetch(:channel_max, 65536)
  @client_heartbeat   = self.heartbeat_from(opts)

  @client_properties   = opts[:properties] || DEFAULT_CLIENT_PROPERTIES
  @mechanism           = opts.fetch(:auth_mechanism, "PLAIN")
  @credentials_encoder = credentials_encoder_for(@mechanism)
  @locale              = @opts.fetch(:locale, DEFAULT_LOCALE)

  @mutex_impl          = @opts.fetch(:mutex_impl, Monitor)

  # mutex for the channel id => channel hash
  @channel_mutex       = @mutex_impl.new
  # transport operations/continuations mutex. A workaround for
  # the non-reentrant Ruby mutexes. MK.
  @transport_mutex     = @mutex_impl.new
  @channels            = Hash.new

  @origin_thread       = Thread.current

  self.reset_continuations
  self.initialize_transport
end

Instance Attribute Details

#channel_id_allocatorObject (readonly)

Returns the value of attribute channel_id_allocator.



84
85
86
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 84

def channel_id_allocator
  @channel_id_allocator
end

#continuation_timeoutInteger (readonly)

Returns Timeout for blocking protocol operations (queue.declare, queue.bind, etc), in milliseconds. Default is 15000.

Returns:

  • (Integer)

    Timeout for blocking protocol operations (queue.declare, queue.bind, etc), in milliseconds. Default is 15000.



91
92
93
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 91

def continuation_timeout
  @continuation_timeout
end

#default_channelObject (readonly)

Returns the value of attribute default_channel.



83
84
85
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 83

def default_channel
  @default_channel
end

#frame_maxObject (readonly)

Returns the value of attribute frame_max.



81
82
83
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 81

def frame_max
  @frame_max
end

#heartbeatObject (readonly)

Returns the value of attribute heartbeat.



81
82
83
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 81

def heartbeat
  @heartbeat
end

#hostObject (readonly)

Returns the value of attribute host.



81
82
83
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 81

def host
  @host
end

#loggerLogger (readonly)

Returns:

  • (Logger)


89
90
91
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 89

def logger
  @logger
end

#mechanismString (readonly)

Authentication mechanism, e.g. “PLAIN” or “EXTERNAL”

Returns:

  • (String)


87
88
89
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 87

def mechanism
  @mechanism
end

#mutex_implObject (readonly)



205
206
207
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 205

def mutex_impl
  @mutex_impl
end

#passObject (readonly)

Returns the value of attribute pass.



81
82
83
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 81

def pass
  @pass
end

#portObject (readonly)

Returns the value of attribute port.



81
82
83
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 81

def port
  @port
end

#server_authentication_mechanismsObject (readonly)

Returns the value of attribute server_authentication_mechanisms.



82
83
84
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 82

def server_authentication_mechanisms
  @server_authentication_mechanisms
end

#server_capabilitiesObject (readonly)

Returns the value of attribute server_capabilities.



82
83
84
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 82

def server_capabilities
  @server_capabilities
end

#server_localesObject (readonly)

Returns the value of attribute server_locales.



82
83
84
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 82

def server_locales
  @server_locales
end

#server_propertiesObject (readonly)

Returns the value of attribute server_properties.



82
83
84
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 82

def server_properties
  @server_properties
end

#statusObject (readonly)

Returns the value of attribute status.



81
82
83
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 81

def status
  @status
end

#threadedObject (readonly)

Returns the value of attribute threaded.



81
82
83
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 81

def threaded
  @threaded
end

#transportGorgonBunny::Transport (readonly)



80
81
82
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 80

def transport
  @transport
end

#userObject (readonly)

Returns the value of attribute user.



81
82
83
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 81

def user
  @user
end

#vhostObject (readonly)

Returns the value of attribute vhost.



81
82
83
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 81

def vhost
  @vhost
end

Class Method Details

.parse_uri(uri) ⇒ Hash

Parses an amqp URI into a hash that #initialize accepts.

Parameters:

  • uri (String)

    amqp or amqps URI to parse

Returns:

  • (Hash)

    Parsed URI as a hash



395
396
397
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 395

def self.parse_uri(uri)
  GorgonAMQ::Settings.parse_amqp_url(uri)
end

Instance Method Details

#automatically_recover?Boolean

Returns true if this connection has automatic recovery from network failure enabled.

Returns:

  • (Boolean)

    true if this connection has automatic recovery from network failure enabled



325
326
327
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 325

def automatically_recover?
  @automatically_recover
end

#blocked?Boolean

Returns true if the connection is currently blocked by RabbitMQ because it’s running low on RAM, disk space, or other resource; false otherwise.

Returns:

  • (Boolean)

    true if the connection is currently blocked by RabbitMQ because it’s running low on RAM, disk space, or other resource; false otherwise

See Also:



387
388
389
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 387

def blocked?
  @blocked
end

#closeObject Also known as: stop

Closes the connection. This involves closing all of its channels.



280
281
282
283
284
285
286
287
288
289
290
291
292
293
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 280

def close
  if @transport.open?
    close_all_channels

    GorgonBunny::Timeout.timeout(@transport.disconnect_timeout, ClientTimeout) do
      self.close_connection(true)
    end

    maybe_shutdown_reader_loop
    close_transport

    @status = :closed
  end
end

#close_all_channelsObject



471
472
473
474
475
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 471

def close_all_channels
  @channels.reject {|n, ch| n == 0 || !ch.open? }.each do |_, ch|
    GorgonBunny::Timeout.timeout(@transport.disconnect_timeout, ClientTimeout) { ch.close }
  end
end

#close_channel(ch) ⇒ Object



459
460
461
462
463
464
465
466
467
468
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 459

def close_channel(ch)
  n = ch.number

  @transport.send_frame(GorgonAMQ::Protocol::Channel::Close.encode(n, 200, "Goodbye", 0, 0))
  @last_channel_close_ok = wait_on_continuations
  raise_if_continuation_resulted_in_a_connection_error!

  self.unregister_channel(ch)
  @last_channel_close_ok
end

#close_connection(sync = true) ⇒ Object



478
479
480
481
482
483
484
485
486
487
488
489
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 478

def close_connection(sync = true)
  if @transport.open?
    @transport.send_frame(GorgonAMQ::Protocol::Connection::Close.encode(200, "Goodbye", 0, 0))

    maybe_shutdown_heartbeat_sender
    @status   = :not_connected

    if sync
      @last_connection_close_ok = wait_on_continuations
    end
  end
end

#close_transportObject



759
760
761
762
763
764
765
766
767
768
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 759

def close_transport
  begin
    @transport.close
  rescue StandardError => e
    @logger.error "Exception when closing transport:"
    @logger.error e.class.name
    @logger.error e.message
    @logger.error e.backtrace
  end
end

#closed?Boolean

Returns true if this AMQP 0.9.1 connection is closed.

Returns:

  • (Boolean)

    true if this AMQP 0.9.1 connection is closed



314
315
316
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 314

def closed?
  status == :closed
end

#configure_socket(&block) ⇒ Object

Provides a way to fine tune the socket used by connection. Accepts a block that the socket will be yielded to.

Raises:

  • (ArgumentError)


209
210
211
212
213
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 209

def configure_socket(&block)
  raise ArgumentError, "No block provided!" if block.nil?

  @transport.configure_socket(&block)
end

#connecting?Boolean

Returns true if this connection is still not fully open.

Returns:

  • (Boolean)

    true if this connection is still not fully open



309
310
311
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 309

def connecting?
  status == :connecting
end

#create_channel(n = nil, consumer_pool_size = 1) ⇒ GorgonBunny::Channel Also known as: channel

Opens a new channel and returns it. This method will block the calling thread until the response is received and the channel is guaranteed to be opened (this operation is very fast and inexpensive).

Returns:



268
269
270
271
272
273
274
275
276
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 268

def create_channel(n = nil, consumer_pool_size = 1)
  if n && (ch = @channels[n])
    ch
  else
    ch = GorgonBunny::Channel.new(self, n, ConsumerWorkPool.new(consumer_pool_size || 1))
    ch.open
    ch
  end
end

#direct(*args) ⇒ Object



339
340
341
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 339

def direct(*args)
  @default_channel.direct(*args)
end

#exchange(*args) ⇒ Object



359
360
361
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 359

def exchange(*args)
  @default_channel.exchange(*args)
end

#exchange_exists?(name) ⇒ Boolean

Checks if a exchange with given name exists.

Implemented using exchange.declare with passive set to true and a one-off (short lived) channel under the hood.

Parameters:

  • name (String)

    Exchange name

Returns:

  • (Boolean)

    true if exchange exists



427
428
429
430
431
432
433
434
435
436
437
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 427

def exchange_exists?(name)
  ch = create_channel
  begin
    ch.exchange(name, :passive => true)
    true
  rescue GorgonBunny::NotFound => _
    false
  ensure
    ch.close if ch.open?
  end
end

#fanout(*args) ⇒ Object



344
345
346
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 344

def fanout(*args)
  @default_channel.fanout(*args)
end

#handle_frame(ch_number, method) ⇒ Object

Handles incoming frames and dispatches them.

Channel methods (‘channel.open-ok`, `channel.close-ok`) are handled by the session itself. Connection level errors result in exceptions being raised. Deliveries and other methods are passed on to channels to dispatch.



499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 499

def handle_frame(ch_number, method)
  @logger.debug "Session#handle_frame on #{ch_number}: #{method.inspect}"
  case method
  when GorgonAMQ::Protocol::Channel::OpenOk then
    @continuations.push(method)
  when GorgonAMQ::Protocol::Channel::CloseOk then
    @continuations.push(method)
  when GorgonAMQ::Protocol::Connection::Close then
    @last_connection_error = instantiate_connection_level_exception(method)
    @continuations.push(method)

    @origin_thread.raise(@last_connection_error)
  when GorgonAMQ::Protocol::Connection::CloseOk then
    @last_connection_close_ok = method
    begin
      @continuations.clear
    rescue StandardError => e
      @logger.error e.class.name
      @logger.error e.message
      @logger.error e.backtrace
    ensure
      @continuations.push(:__unblock__)
    end
  when GorgonAMQ::Protocol::Connection::Blocked then
    @blocked = true
    @block_callback.call(method) if @block_callback
  when GorgonAMQ::Protocol::Connection::Unblocked then
    @blocked = false
    @unblock_callback.call(method) if @unblock_callback
  when GorgonAMQ::Protocol::Channel::Close then
    begin
      ch = @channels[ch_number]
      ch.handle_method(method)
    ensure
      self.unregister_channel(ch)
    end
  when GorgonAMQ::Protocol::Basic::GetEmpty then
    @channels[ch_number].handle_basic_get_empty(method)
  else
    if ch = @channels[ch_number]
      ch.handle_method(method)
    else
      @logger.warn "Channel #{ch_number} is not open on this connection!"
    end
  end
end

#handle_frameset(ch_number, frames) ⇒ Object



552
553
554
555
556
557
558
559
560
561
562
563
564
565
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 552

def handle_frameset(ch_number, frames)
  method = frames.first

  case method
  when GorgonAMQ::Protocol::Basic::GetOk then
    @channels[ch_number].handle_basic_get_ok(*frames)
  when GorgonAMQ::Protocol::Basic::GetEmpty then
    @channels[ch_number].handle_basic_get_empty(*frames)
  when GorgonAMQ::Protocol::Basic::Return then
    @channels[ch_number].handle_basic_return(*frames)
  else
    @channels[ch_number].handle_frameset(*frames)
  end
end

#handle_network_failure(exception) ⇒ Object



568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 568

def handle_network_failure(exception)
  raise NetworkErrorWrapper.new(exception) unless @threaded

  @status = :disconnected

  if !recovering_from_network_failure?
    @recovering_from_network_failure = true
    if recoverable_network_failure?(exception)
      @logger.warn "Recovering from a network failure..."
      @channels.each do |n, ch|
        ch.maybe_kill_consumer_work_pool!
      end
      maybe_shutdown_heartbeat_sender

      recover_from_network_failure
    else
      # TODO: investigate if we can be a bit smarter here. MK.
    end
  end
end

#headers(*args) ⇒ Object



354
355
356
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 354

def headers(*args)
  @default_channel.headers(*args)
end

#heartbeat_from(options) ⇒ Object



691
692
693
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 691

def heartbeat_from(options)
  options[:heartbeat] || options[:heartbeat_interval] || options[:requested_heartbeat] || DEFAULT_HEARTBEAT
end

#heartbeat_intervalInteger

Returns Heartbeat interval used.

Returns:

  • (Integer)

    Heartbeat interval used



185
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 185

def heartbeat_interval; self.heartbeat; end

#hostnameString

Returns RabbitMQ hostname (or IP address) used.

Returns:

  • (String)

    RabbitMQ hostname (or IP address) used



176
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 176

def hostname;     self.host;  end

#hostname_from(options) ⇒ Object



660
661
662
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 660

def hostname_from(options)
  options[:host] || options[:hostname] || DEFAULT_HOST
end

#instantiate_connection_level_exception(frame) ⇒ Object



633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 633

def instantiate_connection_level_exception(frame)
  case frame
  when GorgonAMQ::Protocol::Connection::Close then
    klass = case frame.reply_code
            when 320 then
              ConnectionForced
            when 501 then
              FrameError
            when 503 then
              CommandInvalid
            when 504 then
              ChannelError
            when 505 then
              UnexpectedFrame
            when 506 then
              ResourceError
            when 541 then
              InternalError
            else
              raise "Unknown reply code: #{frame.reply_code}, text: #{frame.reply_text}"
            end

    klass.new("Connection-level error: #{frame.reply_text}", self, frame)
  end
end

#maybe_shutdown_reader_loopObject



733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 733

def maybe_shutdown_reader_loop
  if @reader_loop
    @reader_loop.stop
    if threaded?
      # this is the easiest way to wait until the loop
      # is guaranteed to have terminated
      @reader_loop.raise(ShutdownSignal)
      # joining the thread here may take forever
      # on JRuby because sun.nio.ch.KQueueArrayWrapper#kevent0 is
      # a native method that cannot be (easily) interrupted.
      # So we use this ugly hack or else our test suite takes forever
      # to run on JRuby (a new connection is opened/closed per example). MK.
      if defined?(JRUBY_VERSION)
        sleep 0.075
      else
        @reader_loop.join
      end
    else
      # single threaded mode, nothing to do. MK.
    end
  end

  @reader_loop = nil
end

#next_channel_idObject



696
697
698
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 696

def next_channel_id
  @channel_id_allocator.next_channel_id
end

#on_blocked {|GorgonAMQ::Protocol::Connection::Blocked| ... } ⇒ Object

Defines a callback that will be executed when RabbitMQ blocks the connection because it is running low on memory or disk space (as configured via config file and/or rabbitmqctl).

Yields:



370
371
372
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 370

def on_blocked(&block)
  @block_callback = block
end

#on_unblocked(&block) ⇒ Object

Defines a callback that will be executed when RabbitMQ unblocks the connection that was previously blocked, e.g. because the memory or disk space alarm has cleared.

See Also:



379
380
381
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 379

def on_unblocked(&block)
  @unblock_callback = block
end

#open?Boolean Also known as: connected?

Returns true if this AMQP 0.9.1 connection is open.

Returns:

  • (Boolean)

    true if this AMQP 0.9.1 connection is open



319
320
321
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 319

def open?
  (status == :open || status == :connected || status == :connecting) && @transport.open?
end

#open_channel(ch) ⇒ Object



445
446
447
448
449
450
451
452
453
454
455
456
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 445

def open_channel(ch)
  n = ch.number
  self.register_channel(ch)

  @transport_mutex.synchronize do
    @transport.send_frame(GorgonAMQ::Protocol::Channel::Open.encode(n, GorgonAMQ::Protocol::EMPTY_STRING))
  end
  @last_channel_open_ok = wait_on_continuations
  raise_if_continuation_resulted_in_a_connection_error!

  @last_channel_open_ok
end

#passwordString

Returns Password used.

Returns:

  • (String)

    Password used



180
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 180

def password;     self.pass;  end

#password_from(options) ⇒ Object



686
687
688
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 686

def password_from(options)
  options[:password] || options[:pass] || options[:pwd] || DEFAULT_PASSWORD
end

#port_from(options) ⇒ Object



665
666
667
668
669
670
671
672
673
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 665

def port_from(options)
  fallback = if options[:tls] || options[:ssl]
               GorgonAMQ::Protocol::TLS_PORT
             else
               GorgonAMQ::Protocol::DEFAULT_PORT
             end

  options.fetch(:port, fallback)
end

#queue(*args) ⇒ Object



334
335
336
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 334

def queue(*args)
  @default_channel.queue(*args)
end

#queue_exists?(name) ⇒ Boolean

Checks if a queue with given name exists.

Implemented using queue.declare with passive set to true and a one-off (short lived) channel under the hood.

Parameters:

  • name (String)

    Queue name

Returns:

  • (Boolean)

    true if queue exists



407
408
409
410
411
412
413
414
415
416
417
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 407

def queue_exists?(name)
  ch = create_channel
  begin
    ch.queue(name, :passive => true)
    true
  rescue GorgonBunny::NotFound => _
    false
  ensure
    ch.close if ch.open?
  end
end

#raise_if_continuation_resulted_in_a_connection_error!Object

Raises:

  • (@last_connection_error)


547
548
549
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 547

def raise_if_continuation_resulted_in_a_connection_error!
  raise @last_connection_error if @last_connection_error
end

#read_write_timeoutInteger

Socket operation timeout used by this connection

Returns:

  • (Integer)


259
260
261
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 259

def read_write_timeout
  @transport.read_write_timeout
end

#reader_loopObject



728
729
730
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 728

def reader_loop
  @reader_loop ||= ReaderLoop.new(@transport, self, Thread.current)
end

#recover_channelsObject



621
622
623
624
625
626
627
628
629
630
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 621

def recover_channels
  # default channel is reopened right after connection
  # negotiation is completed, so make sure we do not try to open
  # it twice. MK.
  @channels.reject { |n, ch| ch == @default_channel }.each do |n, ch|
    ch.open

    ch.recover_from_network_failure
  end
end

#recover_from_network_failureObject



601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 601

def recover_from_network_failure
  begin
    sleep @network_recovery_interval
    @logger.debug "About to start connection recovery..."
    self.initialize_transport
    self.start

    if open?
      @recovering_from_network_failure = false

      recover_channels
    end
  rescue TCPConnectionFailed, GorgonAMQ::Protocol::EmptyResponseError => e
    @logger.warn "TCP connection failed, reconnecting in 5 seconds"
    sleep @network_recovery_interval
    retry if recoverable_network_failure?(e)
  end
end

#recoverable_network_failure?(exception) ⇒ Boolean

Returns:

  • (Boolean)


590
591
592
593
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 590

def recoverable_network_failure?(exception)
  # TODO: investigate if we can be a bit smarter here. MK.
  true
end

#recovering_from_network_failure?Boolean

Returns:

  • (Boolean)


596
597
598
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 596

def recovering_from_network_failure?
  @recovering_from_network_failure
end

#register_channel(ch) ⇒ Object



706
707
708
709
710
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 706

def register_channel(ch)
  @channel_mutex.synchronize do
    @channels[ch.number] = ch
  end
end

#release_channel_id(i) ⇒ Object



701
702
703
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 701

def release_channel_id(i)
  @channel_id_allocator.release_channel_id(i)
end

#send_frame(frame, signal_activity = true) ⇒ Object

Sends frame to the peer, checking that connection is open. Exposed primarily for GorgonBunny::Channel



781
782
783
784
785
786
787
788
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 781

def send_frame(frame, signal_activity = true)
  if open?
    @transport.write(frame.encode)
    signal_activity! if signal_activity
  else
    raise ConnectionClosedError.new(frame)
  end
end

#send_frame_without_timeout(frame, signal_activity = true) ⇒ Object

Sends frame to the peer, checking that connection is open. Uses transport implementation that does not perform timeout control. Exposed primarily for GorgonBunny::Channel.



796
797
798
799
800
801
802
803
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 796

def send_frame_without_timeout(frame, signal_activity = true)
  if open?
    @transport.write_without_timeout(frame.encode)
    signal_activity! if signal_activity
  else
    raise ConnectionClosedError.new(frame)
  end
end

#send_frameset(frames, channel) ⇒ Object

Sends multiple frames, one by one. For thread safety this method takes a channel object and synchronizes on it.



809
810
811
812
813
814
815
816
817
818
819
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 809

def send_frameset(frames, channel)
  # some developers end up sharing channels between threads and when multiple
  # threads publish on the same channel aggressively, at some point frames will be
  # delivered out of order and broker will raise 505 UNEXPECTED_FRAME exception.
  # If we synchronize on the channel, however, this is both thread safe and pretty fine-grained
  # locking. Note that "single frame" methods do not need this kind of synchronization. MK.
  channel.synchronize do
    frames.each { |frame| self.send_frame(frame, false) }
    signal_activity!
  end
end

#send_frameset_without_timeout(frames, channel) ⇒ Object

Sends multiple frames, one by one. For thread safety this method takes a channel object and synchronizes on it. Uses transport implementation that does not perform timeout control.



826
827
828
829
830
831
832
833
834
835
836
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 826

def send_frameset_without_timeout(frames, channel)
  # some developers end up sharing channels between threads and when multiple
  # threads publish on the same channel aggressively, at some point frames will be
  # delivered out of order and broker will raise 505 UNEXPECTED_FRAME exception.
  # If we synchronize on the channel, however, this is both thread safe and pretty fine-grained
  # locking. Note that "single frame" methods do not need this kind of synchronization. MK.
  channel.synchronize do
    frames.each { |frame| self.send_frame_without_timeout(frame, false) }
    signal_activity!
  end
end

#send_raw_without_timeout(data, channel) ⇒ Object



839
840
841
842
843
844
845
846
847
848
849
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 839

def send_raw_without_timeout(data, channel)
  # some developers end up sharing channels between threads and when multiple
  # threads publish on the same channel aggressively, at some point frames will be
  # delivered out of order and broker will raise 505 UNEXPECTED_FRAME exception.
  # If we synchronize on the channel, however, this is both thread safe and pretty fine-grained
  # locking. Note that "single frame" methods do not need this kind of synchronization. MK.
  channel.synchronize do
    @transport.write(data)
    signal_activity!
  end
end

#signal_activity!Object



771
772
773
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 771

def signal_activity!
  @heartbeat_sender.signal_activity! if @heartbeat_sender
end

#startObject

Starts the connection process.



220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 220

def start
  return self if connected?

  @status        = :connecting
  # reset here for cases when automatic network recovery kicks in
  # when we were blocked. MK.
  @blocked       = false
  self.reset_continuations

  begin
    # close existing transport if we have one,
    # to not leak sockets
    @transport.maybe_initialize_socket

    @transport.post_initialize_socket
    @transport.connect

    if @socket_configurator
      @transport.configure_socket(&@socket_configurator)
    end

    self.init_connection
    self.open_connection

    @reader_loop = nil
    self.start_reader_loop if threaded?

    @default_channel = self.create_channel
  rescue Exception => e
    @status = :not_connected
    raise e
  end

  self
end

#start_reader_loopObject



723
724
725
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 723

def start_reader_loop
  reader_loop.start
end

#threaded?Boolean

Returns true if this connection uses a separate thread for I/O activity.

Returns:

  • (Boolean)

    true if this connection uses a separate thread for I/O activity



200
201
202
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 200

def threaded?
  @threaded
end

#to_sString

Returns:

  • (String)


853
854
855
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 853

def to_s
  "#<#{self.class.name}:#{object_id} #{@user}@#{@host}:#{@port}, vhost=#{@vhost}>"
end

#topic(*args) ⇒ Object



349
350
351
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 349

def topic(*args)
  @default_channel.topic(*args)
end

#unregister_channel(ch) ⇒ Object



713
714
715
716
717
718
719
720
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 713

def unregister_channel(ch)
  @channel_mutex.synchronize do
    n = ch.number

    self.release_channel_id(n)
    @channels.delete(ch.number)
  end
end

#usernameString

Returns Username used.

Returns:

  • (String)

    Username used



178
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 178

def username;     self.user;  end

#username_from(options) ⇒ Object



681
682
683
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 681

def username_from(options)
  options[:username] || options[:user] || DEFAULT_USER
end

#uses_ssl?Boolean Also known as: ssl?

Returns true if this connection uses TLS (SSL).

Returns:

  • (Boolean)

    true if this connection uses TLS (SSL)



194
195
196
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 194

def uses_ssl?
  @transport.uses_ssl?
end

#uses_tls?Boolean Also known as: tls?

Returns true if this connection uses TLS (SSL).

Returns:

  • (Boolean)

    true if this connection uses TLS (SSL)



188
189
190
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 188

def uses_tls?
  @transport.uses_tls?
end

#vhost_from(options) ⇒ Object



676
677
678
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 676

def vhost_from(options)
  options[:virtual_host] || options[:vhost] || DEFAULT_VHOST
end

#virtual_hostString

Returns Virtual host used.

Returns:

  • (String)

    Virtual host used



182
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 182

def virtual_host; self.vhost; end

#with_channel(n = nil) {|ch| ... } ⇒ GorgonBunny::Session

Creates a temporary channel, yields it to the block given to this method and closes it.

Yields:

  • (ch)

Returns:



300
301
302
303
304
305
306
# File 'lib/gorgon_bunny/lib/gorgon_bunny/session.rb', line 300

def with_channel(n = nil)
  ch = create_channel(n)
  yield ch
  ch.close if ch.open?

  self
end