Module: AMQ::Client::Async::Adapter Abstract

Extended by:
RegisterEntityMixin
Includes:
Callbacks, Openable
Included in:
CoolioClient, EventMachineClient
Defined in:
lib/amq/client/async/adapter.rb

Overview

This module is abstract.

Base adapter class. Specific implementations (for example, EventMachine-based, Cool.io-based or sockets-based) subclass it and must implement Adapter API methods:

  • #send_raw(data)

  • #estabilish_connection(settings)

  • #close_connection

Defined Under Namespace

Modules: ClassMethods

Constant Summary

Constants included from Openable

Openable::VALUES

Instance Attribute Summary

Attributes included from Openable

#status

Error Handling and Recovery collapse

Instance Method Summary collapse

Methods included from RegisterEntityMixin

register_entity

Methods included from Callbacks

#clear_callbacks, #define_callback, #exec_callback, #exec_callback_once, #exec_callback_once_yielding_self, #exec_callback_yielding_self, #has_callback?, #prepend_callback, #redefine_callback

Methods included from Openable

#closed!, #closed?, #closing!, #closing?, #opened!, #opened?, #opening!, #opening?

Instance Method Details

#auth_mechanism_adapterObject

Retrieves an AuthMechanismAdapter that will encode credentials for this Adapter.



522
523
524
# File 'lib/amq/client/async/adapter.rb', line 522

def auth_mechanism_adapter
  @auth_mechanism_adapter ||= AuthMechanismAdapter.for_adapter(self)
end

#auto_recoverObject

Performs recovery of channels that are in the automatic recovery mode. Does not run recovery callbacks.



429
430
431
# File 'lib/amq/client/async/adapter.rb', line 429

def auto_recover
  @channels.select { |channel_id, ch| ch.auto_recovering? }.each { |n, ch| ch.auto_recover }
end

#auto_recovering?Boolean Also known as: auto_recovery?

Returns whether connection is in the automatic recovery mode.

Returns:

  • (Boolean)

    whether connection is in the automatic recovery mode



416
417
418
# File 'lib/amq/client/async/adapter.rb', line 416

def auto_recovering?
  !!@auto_recovery
end

#before_recovery(&block) ⇒ Object

Defines a callback that will be executed after TCP connection has recovered after a network failure but before AMQP connection is re-opened. Only one callback can be defined (the one defined last replaces previously added ones).



385
386
387
# File 'lib/amq/client/async/adapter.rb', line 385

def before_recovery(&block)
  self.redefine_callback(:before_recovery, &block)
end

#content_complete?(frames) ⇒ Boolean (protected)

Determines, whether given frame array contains full content body

Returns:

  • (Boolean)


706
707
708
709
710
711
# File 'lib/amq/client/async/adapter.rb', line 706

def content_complete?(frames)
  return false if frames.empty?
  header = frames[0]
  raise "Not a content header frame first: #{header.inspect}" unless header.kind_of?(AMQ::Protocol::HeaderFrame)
  header.body_size == frames[1..-1].inject(0) {|sum, frame| sum + frame.payload.size }
end

#disconnect(reply_code = 200, reply_text = "Goodbye", class_id = 0, method_id = 0, &block) ⇒ Object

Properly close connection with AMQ broker, as described in section 2.2.4 of the AMQP 0.9.1 specification.

See Also:

  • #close_connection


213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
# File 'lib/amq/client/async/adapter.rb', line 213

def disconnect(reply_code = 200, reply_text = "Goodbye", class_id = 0, method_id = 0, &block)
  @intentionally_closing_connection = true
  self.on_disconnection do
    @frames.clear
    block.call if block
  end

  # ruby-amqp/amqp#66, MK.
  if self.open?
    closing!
    self.send_frame(Protocol::Connection::Close.encode(reply_code, reply_text, class_id, method_id))
  elsif self.closing?
    # no-op
  else
    self.disconnection_successful
  end
end

#encode_credentials(username, password) ⇒ Object

See Also:



514
515
516
# File 'lib/amq/client/async/adapter.rb', line 514

def encode_credentials(username, password)
  auth_mechanism_adapter.encode_credentials(username, password)
end

#establish_connection(settings) ⇒ Object

Establish socket connection to the server.

Raises:

  • (NotImplementedError)


204
205
206
# File 'lib/amq/client/async/adapter.rb', line 204

def establish_connection(settings)
  raise NotImplementedError
end

#frameset_complete?(frames) ⇒ Boolean (protected)

Determines, whether the received frameset is ready to be further processed

Returns:

  • (Boolean)


699
700
701
702
703
# File 'lib/amq/client/async/adapter.rb', line 699

def frameset_complete?(frames)
  return false if frames.empty?
  first_frame = frames[0]
  first_frame.final? || (first_frame.method_class.has_content? && content_complete?(frames[1..-1]))
end

#get_next_frameObject (protected)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns next frame from buffer whenever possible



681
682
683
684
685
686
687
688
689
690
691
692
693
694
# File 'lib/amq/client/async/adapter.rb', line 681

def get_next_frame
  return nil unless @chunk_buffer.size > 7 # otherwise, cannot read the length
  # octet + short
  offset = 3 # 1 + 2
  # length
  payload_length = @chunk_buffer[offset, 4].unpack(AMQ::Protocol::PACK_UINT32).first
  # 4 bytes for long payload length, 1 byte final octet
  frame_length = offset + payload_length + 5
  if frame_length <= @chunk_buffer.size
    @chunk_buffer.slice!(0, frame_length)
  else
    nil
  end
end

#handle_close(conn_close) ⇒ Object

Handles connection.close. When broker detects a connection level exception, this method is called.



651
652
653
654
# File 'lib/amq/client/async/adapter.rb', line 651

def handle_close(conn_close)
  closed!
  self.exec_callback_yielding_self(:error, conn_close)
end

#handle_close_ok(close_ok) ⇒ Object

Handles Connection.Close-Ok.



661
662
663
664
# File 'lib/amq/client/async/adapter.rb', line 661

def handle_close_ok(close_ok)
  closed!
  self.disconnection_successful
end

#handle_open_ok(open_ok) ⇒ Object

Handles Connection.Open-Ok.



639
640
641
642
643
644
# File 'lib/amq/client/async/adapter.rb', line 639

def handle_open_ok(open_ok)
  @known_hosts = open_ok.known_hosts.dup.freeze

  opened!
  self.connection_successful if self.respond_to?(:connection_successful)
end

#handle_start(connection_start) ⇒ Object

Handles connection.start.



599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
# File 'lib/amq/client/async/adapter.rb', line 599

def handle_start(connection_start)
  @server_properties                = connection_start.server_properties
  @server_capabilities              = @server_properties["capabilities"]

  @server_authentication_mechanisms = (connection_start.mechanisms || "").split(" ")
  @server_locales                   = Array(connection_start.locales)

  username = @settings[:user] || @settings[:username]
  password = @settings[:pass] || @settings[:password]

  # It's not clear whether we should transition to :opening state here
  # or in #open but in case authentication fails, it would be strange to have
  # @status undefined. So lets do this. MK.
  opening!

  self.send_frame(Protocol::Connection::StartOk.encode(@client_properties, mechanism, self.encode_credentials(username, password), @locale))
end

#handle_tune(connection_tune) ⇒ Object

Handles Connection.Tune-Ok.



622
623
624
625
626
627
628
629
630
631
632
# File 'lib/amq/client/async/adapter.rb', line 622

def handle_tune(connection_tune)
  @channel_max        = connection_tune.channel_max.freeze
  @frame_max          = connection_tune.frame_max.freeze

  client_heartbeat    = @settings[:heartbeat] || @settings[:heartbeat_interval] || 0

  @heartbeat_interval = negotiate_heartbeat_value(client_heartbeat, connection_tune.heartbeat)

  self.send_frame(Protocol::Connection::TuneOk.encode(@channel_max, [settings[:frame_max], @frame_max].min, @heartbeat_interval))
  self.initialize_heartbeat_sender if heartbeats_enabled?
end

#handshakeObject

Sends connection preamble to the broker.



491
492
493
494
# File 'lib/amq/client/async/adapter.rb', line 491

def handshake
  @authenticating = true
  self.send_preamble
end

#heartbeat_intervalFixnum

Returns heartbeat interval this client uses, in seconds. This value may or may not be used depending on broker capabilities. Zero means the server does not want a heartbeat.

Returns:

  • (Fixnum)

    Heartbeat interval this client uses, in seconds.

See Also:



277
278
279
# File 'lib/amq/client/async/adapter.rb', line 277

def heartbeat_interval
  @heartbeat_interval
end

#heartbeats_enabled?Boolean

Returns true if heartbeats are enabled (heartbeat interval is greater than 0)

Returns:

  • (Boolean)


283
284
285
# File 'lib/amq/client/async/adapter.rb', line 283

def heartbeats_enabled?
  @heartbeat_interval && (@heartbeat_interval > 0)
end

#negotiate_heartbeat_value(client_value, server_value) ⇒ Object (protected)



670
671
672
673
674
675
676
# File 'lib/amq/client/async/adapter.rb', line 670

def negotiate_heartbeat_value(client_value, server_value)
  if client_value == 0 || server_value == 0
    [client_value, server_value].max
  else
    [client_value, server_value].min
  end
end

#on_connection_interruption(&block) ⇒ Object Also known as: after_connection_interruption

Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure). Only one callback can be defined (the one defined last replaces previously added ones).



363
364
365
# File 'lib/amq/client/async/adapter.rb', line 363

def on_connection_interruption(&block)
  self.redefine_callback(:after_connection_interruption, &block)
end

#on_error(&block) ⇒ Object

Defines a callback that will be executed when connection is closed after connection-level exception. Only one callback can be defined (the one defined last replaces previously added ones).



354
355
356
# File 'lib/amq/client/async/adapter.rb', line 354

def on_error(&block)
  self.redefine_callback(:error, &block)
end

#on_possible_authentication_failure(&block) ⇒ Object

Defines a callback that will be run when TCP connection is closed before authentication finishes. Usually this means authentication failure. You can define only one callback.



344
345
346
# File 'lib/amq/client/async/adapter.rb', line 344

def on_possible_authentication_failure(&block)
  @on_possible_authentication_failure = block
end

#on_recovery(&block) ⇒ Object Also known as: after_recovery

Defines a callback that will be executed after AMQP connection has recovered after a network failure.. Only one callback can be defined (the one defined last replaces previously added ones).



401
402
403
# File 'lib/amq/client/async/adapter.rb', line 401

def on_recovery(&block)
  self.redefine_callback(:after_recovery, &block)
end

#on_skipped_heartbeats(&block) ⇒ Object

Defines a callback that will be executed after time since last broker heartbeat is greater than or equal to the heartbeat interval (skipped heartbeat is detected). Only one callback can be defined (the one defined last replaces previously added ones).



462
463
464
# File 'lib/amq/client/async/adapter.rb', line 462

def on_skipped_heartbeats(&block)
  self.redefine_callback(:skipped_heartbeats, &block)
end

#on_tcp_connection_failure(&block) ⇒ Object

Defines a callback that will be run when initial TCP connection fails. You can define only one callback.



328
329
330
# File 'lib/amq/client/async/adapter.rb', line 328

def on_tcp_connection_failure(&block)
  @on_tcp_connection_failure = block
end

#on_tcp_connection_loss(&block) ⇒ Object

Defines a callback that will be run when TCP connection to AMQP broker is lost (interrupted). You can define only one callback.



336
337
338
# File 'lib/amq/client/async/adapter.rb', line 336

def on_tcp_connection_loss(&block)
  @on_tcp_connection_loss = block
end

#open(vhost = "/") ⇒ Object

Sends connection.open to the server.



501
502
503
# File 'lib/amq/client/async/adapter.rb', line 501

def open(vhost = "/")
  self.send_frame(Protocol::Connection::Open.encode(vhost))
end

#receive_frame(frame) ⇒ Object

Processes a single frame.

Parameters:

  • frame (AMQ::Protocol::Frame)


531
532
533
534
535
536
537
538
539
540
# File 'lib/amq/client/async/adapter.rb', line 531

def receive_frame(frame)
  @frames[frame.channel] ||= Array.new
  @frames[frame.channel] << frame

  if frameset_complete?(@frames[frame.channel])
    receive_frameset(@frames[frame.channel])
    # for channel.close, frame.channel will be nil. MK.
    clear_frames_on(frame.channel) if @frames[frame.channel]
  end
end

#receive_frameset(frames) ⇒ Object

Processes a frameset by finding and invoking a suitable handler. Heartbeat frames are treated in a special way: they simply update @last_server_heartbeat value.

Parameters:

  • frames (Array<AMQ::Protocol::Frame>)


548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
# File 'lib/amq/client/async/adapter.rb', line 548

def receive_frameset(frames)
  if self.heartbeats_enabled?
    # treat incoming traffic as heartbeats.
    # this operation is pretty expensive under heavy traffic but heartbeats can be disabled
    # (and are also disabled by default). MK.
    @last_server_heartbeat = Time.now
  end
  frame = frames.first

  if AMQ::Protocol::HeartbeatFrame === frame
    # no-op
  else
    if callable = AMQ::Client::HandlersRegistry.find(frame.method_class)
      f = frames.shift
      callable.call(self, f, frames)
    else
      raise MissingHandlerError.new(frames.first)
    end
  end
end

#reconnecting?Boolean

Returns:

  • (Boolean)


319
320
321
# File 'lib/amq/client/async/adapter.rb', line 319

def reconnecting?
  @reconnecting
end

#reset_state!Object

Resets connection state.



508
509
510
# File 'lib/amq/client/async/adapter.rb', line 508

def reset_state!
  # no-op by default
end

#send_frame(frame) ⇒ Object

Sends frame to the peer, checking that connection is open.



246
247
248
249
250
251
252
# File 'lib/amq/client/async/adapter.rb', line 246

def send_frame(frame)
  if closed?
    raise ConnectionClosedError.new(frame)
  else
    self.send_raw(frame.encode)
  end
end

#send_frameset(frames, channel) ⇒ Object

Sends multiple frames, one by one. For thread safety this method takes a channel object and synchronizes on it.



258
259
260
261
262
263
264
265
266
267
# File 'lib/amq/client/async/adapter.rb', line 258

def send_frameset(frames, channel)
  # some (many) developers end up sharing channels between threads and when multiple
  # threads publish on the same channel aggressively, at some point frames will be
  # delivered out of order and broker will raise 505 UNEXPECTED_FRAME exception.
  # If we synchronize on the channel, however, this is both thread safe and pretty fine-grained
  # locking. Note that "single frame" methods do not need this kind of synchronization. MK.
  channel.synchronize do
    frames.each { |frame| self.send_frame(frame) }
  end
end

#send_heartbeatObject

Sends a heartbeat frame if connection is open.



580
581
582
583
584
585
586
587
588
# File 'lib/amq/client/async/adapter.rb', line 580

def send_heartbeat
  if tcp_connection_established? && !@handling_skipped_hearbeats && @last_server_heartbeat
    if @last_server_heartbeat < (Time.now - (self.heartbeat_interval * 2)) && !reconnecting?
      logger.error "[amqp] Detected missing server heartbeats"
      self.handle_skipped_hearbeats
    end
    send_frame(Protocol::HeartbeatFrame)
  end
end

#send_preambleObject

Note:

This must be implemented by all AMQP clients.

Sends AMQ protocol header (also known as preamble).



239
240
241
# File 'lib/amq/client/async/adapter.rb', line 239

def send_preamble
  self.send_raw(AMQ::Protocol::PREAMBLE)
end

#send_raw(data) ⇒ Object

Note:

This must be implemented by all AMQP clients.

Sends opaque data to AMQ broker over active connection.

Raises:

  • (NotImplementedError)


485
486
487
# File 'lib/amq/client/async/adapter.rb', line 485

def send_raw(data)
  raise NotImplementedError
end

#start_automatic_recoveryObject

Performs recovery of channels that are in the automatic recovery mode. “before recovery” callbacks are run immediately, “after recovery” callbacks are run after AMQP connection is re-established and auto recovery is performed (using #auto_recover).

Use this method if you want to run automatic recovery process after handling a connection-level exception, for example, 320 CONNECTION_FORCED (used by RabbitMQ when it is shut down gracefully).



445
446
447
448
449
450
451
452
453
454
# File 'lib/amq/client/async/adapter.rb', line 445

def start_automatic_recovery
  self.run_before_recovery_callbacks
  self.register_connection_callback do
    # always run automatic recovery, because it is per-channel
    # and connection has to start it. Channels that did not opt-in for
    # autorecovery won't be selected. MK.
    self.auto_recover
    self.run_after_recovery_callbacks
  end
end

#tcp_connection_failedObject

Called when initial TCP connection fails.



303
304
305
306
307
# File 'lib/amq/client/async/adapter.rb', line 303

def tcp_connection_failed
  @recovered = false

  @on_tcp_connection_failure.call(@settings) if @on_tcp_connection_failure
end

#tcp_connection_lostObject

Called when previously established TCP connection fails.



311
312
313
314
315
316
# File 'lib/amq/client/async/adapter.rb', line 311

def tcp_connection_lost
  @recovered = false

  @on_tcp_connection_loss.call(self, @settings) if @on_tcp_connection_loss
  self.handle_connection_interruption
end

#vhostString

vhost this connection uses. Default is “/”, a historically estabilished convention of RabbitMQ and amqp gem.

Returns:

  • (String)

    vhost this connection uses



293
294
295
# File 'lib/amq/client/async/adapter.rb', line 293

def vhost
  @settings.fetch(:vhost, "/")
end