Module: AMQ::Client::Async::Adapter Abstract
- Extended by:
- RegisterEntityMixin
- Included in:
- CoolioClient, EventMachineClient
- Defined in:
- lib/amq/client/async/adapter.rb
Overview
Base adapter class. Specific implementations (for example, EventMachine-based, Cool.io-based or sockets-based) subclass it and must implement Adapter API methods:
-
#send_raw(data)
-
#estabilish_connection(settings)
-
#close_connection
Defined Under Namespace
Modules: ClassMethods
Constant Summary
Constants included from Openable
Instance Attribute Summary
Attributes included from Openable
Error Handling and Recovery collapse
-
#auto_recover ⇒ Object
Performs recovery of channels that are in the automatic recovery mode.
-
#auto_recovering? ⇒ Boolean
(also: #auto_recovery?)
Whether connection is in the automatic recovery mode.
-
#before_recovery(&block) ⇒ Object
Defines a callback that will be executed after TCP connection has recovered after a network failure but before AMQP connection is re-opened.
-
#on_connection_interruption(&block) ⇒ Object
(also: #after_connection_interruption)
Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure).
-
#on_error(&block) ⇒ Object
Defines a callback that will be executed when connection is closed after connection-level exception.
-
#on_possible_authentication_failure(&block) ⇒ Object
Defines a callback that will be run when TCP connection is closed before authentication finishes.
-
#on_recovery(&block) ⇒ Object
(also: #after_recovery)
Defines a callback that will be executed after AMQP connection has recovered after a network failure..
-
#on_skipped_heartbeats(&block) ⇒ Object
Defines a callback that will be executed after time since last broker heartbeat is greater than or equal to the heartbeat interval (skipped heartbeat is detected).
-
#on_tcp_connection_failure(&block) ⇒ Object
Defines a callback that will be run when initial TCP connection fails.
-
#on_tcp_connection_loss(&block) ⇒ Object
Defines a callback that will be run when TCP connection to AMQP broker is lost (interrupted).
- #reconnecting? ⇒ Boolean
-
#start_automatic_recovery ⇒ Object
Performs recovery of channels that are in the automatic recovery mode.
-
#tcp_connection_failed ⇒ Object
Called when initial TCP connection fails.
-
#tcp_connection_lost ⇒ Object
Called when previously established TCP connection fails.
Instance Method Summary collapse
-
#auth_mechanism_adapter ⇒ Object
Retrieves an AuthMechanismAdapter that will encode credentials for this Adapter.
-
#content_complete?(frames) ⇒ Boolean
protected
Determines, whether given frame array contains full content body.
-
#disconnect(reply_code = 200, reply_text = "Goodbye", class_id = 0, method_id = 0, &block) ⇒ Object
Properly close connection with AMQ broker, as described in section 2.2.4 of the AMQP 0.9.1 specification.
- #encode_credentials(username, password) ⇒ Object
-
#establish_connection(settings) ⇒ Object
Establish socket connection to the server.
-
#frameset_complete?(frames) ⇒ Boolean
protected
Determines, whether the received frameset is ready to be further processed.
-
#get_next_frame ⇒ Object
protected
private
Returns next frame from buffer whenever possible.
-
#handle_close(conn_close) ⇒ Object
Handles connection.close.
-
#handle_close_ok(close_ok) ⇒ Object
Handles Connection.Close-Ok.
-
#handle_open_ok(open_ok) ⇒ Object
Handles Connection.Open-Ok.
-
#handle_start(connection_start) ⇒ Object
Handles connection.start.
-
#handle_tune(connection_tune) ⇒ Object
Handles Connection.Tune-Ok.
-
#handshake ⇒ Object
Sends connection preamble to the broker.
-
#heartbeat_interval ⇒ Fixnum
Returns heartbeat interval this client uses, in seconds.
-
#heartbeats_enabled? ⇒ Boolean
Returns true if heartbeats are enabled (heartbeat interval is greater than 0).
- #negotiate_heartbeat_value(client_value, server_value) ⇒ Object protected
-
#open(vhost = "/") ⇒ Object
Sends connection.open to the server.
-
#receive_frame(frame) ⇒ Object
Processes a single frame.
-
#receive_frameset(frames) ⇒ Object
Processes a frameset by finding and invoking a suitable handler.
-
#reset_state! ⇒ Object
Resets connection state.
-
#send_frame(frame) ⇒ Object
Sends frame to the peer, checking that connection is open.
-
#send_frameset(frames, channel) ⇒ Object
Sends multiple frames, one by one.
-
#send_heartbeat ⇒ Object
Sends a heartbeat frame if connection is open.
-
#send_preamble ⇒ Object
Sends AMQ protocol header (also known as preamble).
-
#send_raw(data) ⇒ Object
Sends opaque data to AMQ broker over active connection.
-
#vhost ⇒ String
vhost this connection uses.
Methods included from RegisterEntityMixin
Methods included from Callbacks
#clear_callbacks, #define_callback, #exec_callback, #exec_callback_once, #exec_callback_once_yielding_self, #exec_callback_yielding_self, #has_callback?, #prepend_callback, #redefine_callback
Methods included from Openable
#closed!, #closed?, #closing!, #closing?, #opened!, #opened?, #opening!, #opening?
Instance Method Details
#auth_mechanism_adapter ⇒ Object
Retrieves an AuthMechanismAdapter that will encode credentials for this Adapter.
522 523 524 |
# File 'lib/amq/client/async/adapter.rb', line 522 def auth_mechanism_adapter @auth_mechanism_adapter ||= AuthMechanismAdapter.for_adapter(self) end |
#auto_recover ⇒ Object
Performs recovery of channels that are in the automatic recovery mode. Does not run recovery callbacks.
429 430 431 |
# File 'lib/amq/client/async/adapter.rb', line 429 def auto_recover @channels.select { |channel_id, ch| ch.auto_recovering? }.each { |n, ch| ch.auto_recover } end |
#auto_recovering? ⇒ Boolean Also known as: auto_recovery?
Returns whether connection is in the automatic recovery mode.
416 417 418 |
# File 'lib/amq/client/async/adapter.rb', line 416 def auto_recovering? !!@auto_recovery end |
#before_recovery(&block) ⇒ Object
Defines a callback that will be executed after TCP connection has recovered after a network failure but before AMQP connection is re-opened. Only one callback can be defined (the one defined last replaces previously added ones).
385 386 387 |
# File 'lib/amq/client/async/adapter.rb', line 385 def before_recovery(&block) self.redefine_callback(:before_recovery, &block) end |
#content_complete?(frames) ⇒ Boolean (protected)
Determines, whether given frame array contains full content body
706 707 708 709 710 711 |
# File 'lib/amq/client/async/adapter.rb', line 706 def content_complete?(frames) return false if frames.empty? header = frames[0] raise "Not a content header frame first: #{header.inspect}" unless header.kind_of?(AMQ::Protocol::HeaderFrame) header.body_size == frames[1..-1].inject(0) {|sum, frame| sum + frame.payload.size } end |
#disconnect(reply_code = 200, reply_text = "Goodbye", class_id = 0, method_id = 0, &block) ⇒ Object
Properly close connection with AMQ broker, as described in section 2.2.4 of the AMQP 0.9.1 specification.
213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 |
# File 'lib/amq/client/async/adapter.rb', line 213 def disconnect(reply_code = 200, reply_text = "Goodbye", class_id = 0, method_id = 0, &block) @intentionally_closing_connection = true self.on_disconnection do @frames.clear block.call if block end # ruby-amqp/amqp#66, MK. if self.open? closing! self.send_frame(Protocol::Connection::Close.encode(reply_code, reply_text, class_id, method_id)) elsif self.closing? # no-op else self.disconnection_successful end end |
#encode_credentials(username, password) ⇒ Object
514 515 516 |
# File 'lib/amq/client/async/adapter.rb', line 514 def encode_credentials(username, password) auth_mechanism_adapter.encode_credentials(username, password) end |
#establish_connection(settings) ⇒ Object
Establish socket connection to the server.
204 205 206 |
# File 'lib/amq/client/async/adapter.rb', line 204 def establish_connection(settings) raise NotImplementedError end |
#frameset_complete?(frames) ⇒ Boolean (protected)
Determines, whether the received frameset is ready to be further processed
699 700 701 702 703 |
# File 'lib/amq/client/async/adapter.rb', line 699 def frameset_complete?(frames) return false if frames.empty? first_frame = frames[0] first_frame.final? || (first_frame.method_class.has_content? && content_complete?(frames[1..-1])) end |
#get_next_frame ⇒ Object (protected)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns next frame from buffer whenever possible
681 682 683 684 685 686 687 688 689 690 691 692 693 694 |
# File 'lib/amq/client/async/adapter.rb', line 681 def get_next_frame return nil unless @chunk_buffer.size > 7 # otherwise, cannot read the length # octet + short offset = 3 # 1 + 2 # length payload_length = @chunk_buffer[offset, 4].unpack(AMQ::Protocol::PACK_UINT32).first # 4 bytes for long payload length, 1 byte final octet frame_length = offset + payload_length + 5 if frame_length <= @chunk_buffer.size @chunk_buffer.slice!(0, frame_length) else nil end end |
#handle_close(conn_close) ⇒ Object
Handles connection.close. When broker detects a connection level exception, this method is called.
651 652 653 654 |
# File 'lib/amq/client/async/adapter.rb', line 651 def handle_close(conn_close) closed! self.exec_callback_yielding_self(:error, conn_close) end |
#handle_close_ok(close_ok) ⇒ Object
Handles Connection.Close-Ok.
661 662 663 664 |
# File 'lib/amq/client/async/adapter.rb', line 661 def handle_close_ok(close_ok) closed! self.disconnection_successful end |
#handle_open_ok(open_ok) ⇒ Object
Handles Connection.Open-Ok.
639 640 641 642 643 644 |
# File 'lib/amq/client/async/adapter.rb', line 639 def handle_open_ok(open_ok) @known_hosts = open_ok.known_hosts.dup.freeze opened! self.connection_successful if self.respond_to?(:connection_successful) end |
#handle_start(connection_start) ⇒ Object
Handles connection.start.
599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 |
# File 'lib/amq/client/async/adapter.rb', line 599 def handle_start(connection_start) @server_properties = connection_start.server_properties @server_capabilities = @server_properties["capabilities"] @server_authentication_mechanisms = (connection_start.mechanisms || "").split(" ") @server_locales = Array(connection_start.locales) username = @settings[:user] || @settings[:username] password = @settings[:pass] || @settings[:password] # It's not clear whether we should transition to :opening state here # or in #open but in case authentication fails, it would be strange to have # @status undefined. So lets do this. MK. opening! self.send_frame(Protocol::Connection::StartOk.encode(@client_properties, mechanism, self.encode_credentials(username, password), @locale)) end |
#handle_tune(connection_tune) ⇒ Object
Handles Connection.Tune-Ok.
622 623 624 625 626 627 628 629 630 631 632 |
# File 'lib/amq/client/async/adapter.rb', line 622 def handle_tune(connection_tune) @channel_max = connection_tune.channel_max.freeze @frame_max = connection_tune.frame_max.freeze client_heartbeat = @settings[:heartbeat] || @settings[:heartbeat_interval] || 0 @heartbeat_interval = negotiate_heartbeat_value(client_heartbeat, connection_tune.heartbeat) self.send_frame(Protocol::Connection::TuneOk.encode(@channel_max, [settings[:frame_max], @frame_max].min, @heartbeat_interval)) self.initialize_heartbeat_sender if heartbeats_enabled? end |
#handshake ⇒ Object
Sends connection preamble to the broker.
491 492 493 494 |
# File 'lib/amq/client/async/adapter.rb', line 491 def handshake @authenticating = true self.send_preamble end |
#heartbeat_interval ⇒ Fixnum
Returns heartbeat interval this client uses, in seconds. This value may or may not be used depending on broker capabilities. Zero means the server does not want a heartbeat.
277 278 279 |
# File 'lib/amq/client/async/adapter.rb', line 277 def heartbeat_interval @heartbeat_interval end |
#heartbeats_enabled? ⇒ Boolean
Returns true if heartbeats are enabled (heartbeat interval is greater than 0)
283 284 285 |
# File 'lib/amq/client/async/adapter.rb', line 283 def heartbeats_enabled? @heartbeat_interval && (@heartbeat_interval > 0) end |
#negotiate_heartbeat_value(client_value, server_value) ⇒ Object (protected)
670 671 672 673 674 675 676 |
# File 'lib/amq/client/async/adapter.rb', line 670 def negotiate_heartbeat_value(client_value, server_value) if client_value == 0 || server_value == 0 [client_value, server_value].max else [client_value, server_value].min end end |
#on_connection_interruption(&block) ⇒ Object Also known as: after_connection_interruption
Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure). Only one callback can be defined (the one defined last replaces previously added ones).
363 364 365 |
# File 'lib/amq/client/async/adapter.rb', line 363 def on_connection_interruption(&block) self.redefine_callback(:after_connection_interruption, &block) end |
#on_error(&block) ⇒ Object
Defines a callback that will be executed when connection is closed after connection-level exception. Only one callback can be defined (the one defined last replaces previously added ones).
354 355 356 |
# File 'lib/amq/client/async/adapter.rb', line 354 def on_error(&block) self.redefine_callback(:error, &block) end |
#on_possible_authentication_failure(&block) ⇒ Object
Defines a callback that will be run when TCP connection is closed before authentication finishes. Usually this means authentication failure. You can define only one callback.
344 345 346 |
# File 'lib/amq/client/async/adapter.rb', line 344 def on_possible_authentication_failure(&block) @on_possible_authentication_failure = block end |
#on_recovery(&block) ⇒ Object Also known as: after_recovery
Defines a callback that will be executed after AMQP connection has recovered after a network failure.. Only one callback can be defined (the one defined last replaces previously added ones).
401 402 403 |
# File 'lib/amq/client/async/adapter.rb', line 401 def on_recovery(&block) self.redefine_callback(:after_recovery, &block) end |
#on_skipped_heartbeats(&block) ⇒ Object
Defines a callback that will be executed after time since last broker heartbeat is greater than or equal to the heartbeat interval (skipped heartbeat is detected). Only one callback can be defined (the one defined last replaces previously added ones).
462 463 464 |
# File 'lib/amq/client/async/adapter.rb', line 462 def on_skipped_heartbeats(&block) self.redefine_callback(:skipped_heartbeats, &block) end |
#on_tcp_connection_failure(&block) ⇒ Object
Defines a callback that will be run when initial TCP connection fails. You can define only one callback.
328 329 330 |
# File 'lib/amq/client/async/adapter.rb', line 328 def on_tcp_connection_failure(&block) @on_tcp_connection_failure = block end |
#on_tcp_connection_loss(&block) ⇒ Object
Defines a callback that will be run when TCP connection to AMQP broker is lost (interrupted). You can define only one callback.
336 337 338 |
# File 'lib/amq/client/async/adapter.rb', line 336 def on_tcp_connection_loss(&block) @on_tcp_connection_loss = block end |
#open(vhost = "/") ⇒ Object
Sends connection.open to the server.
501 502 503 |
# File 'lib/amq/client/async/adapter.rb', line 501 def open(vhost = "/") self.send_frame(Protocol::Connection::Open.encode(vhost)) end |
#receive_frame(frame) ⇒ Object
Processes a single frame.
531 532 533 534 535 536 537 538 539 540 |
# File 'lib/amq/client/async/adapter.rb', line 531 def receive_frame(frame) @frames[frame.channel] ||= Array.new @frames[frame.channel] << frame if frameset_complete?(@frames[frame.channel]) receive_frameset(@frames[frame.channel]) # for channel.close, frame.channel will be nil. MK. clear_frames_on(frame.channel) if @frames[frame.channel] end end |
#receive_frameset(frames) ⇒ Object
Processes a frameset by finding and invoking a suitable handler. Heartbeat frames are treated in a special way: they simply update @last_server_heartbeat value.
548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 |
# File 'lib/amq/client/async/adapter.rb', line 548 def receive_frameset(frames) if self.heartbeats_enabled? # treat incoming traffic as heartbeats. # this operation is pretty expensive under heavy traffic but heartbeats can be disabled # (and are also disabled by default). MK. @last_server_heartbeat = Time.now end frame = frames.first if AMQ::Protocol::HeartbeatFrame === frame # no-op else if callable = AMQ::Client::HandlersRegistry.find(frame.method_class) f = frames.shift callable.call(self, f, frames) else raise MissingHandlerError.new(frames.first) end end end |
#reconnecting? ⇒ Boolean
319 320 321 |
# File 'lib/amq/client/async/adapter.rb', line 319 def reconnecting? @reconnecting end |
#reset_state! ⇒ Object
Resets connection state.
508 509 510 |
# File 'lib/amq/client/async/adapter.rb', line 508 def reset_state! # no-op by default end |
#send_frame(frame) ⇒ Object
Sends frame to the peer, checking that connection is open.
246 247 248 249 250 251 252 |
# File 'lib/amq/client/async/adapter.rb', line 246 def send_frame(frame) if closed? raise ConnectionClosedError.new(frame) else self.send_raw(frame.encode) end end |
#send_frameset(frames, channel) ⇒ Object
Sends multiple frames, one by one. For thread safety this method takes a channel object and synchronizes on it.
258 259 260 261 262 263 264 265 266 267 |
# File 'lib/amq/client/async/adapter.rb', line 258 def send_frameset(frames, channel) # some (many) developers end up sharing channels between threads and when multiple # threads publish on the same channel aggressively, at some point frames will be # delivered out of order and broker will raise 505 UNEXPECTED_FRAME exception. # If we synchronize on the channel, however, this is both thread safe and pretty fine-grained # locking. Note that "single frame" methods do not need this kind of synchronization. MK. channel.synchronize do frames.each { |frame| self.send_frame(frame) } end end |
#send_heartbeat ⇒ Object
Sends a heartbeat frame if connection is open.
580 581 582 583 584 585 586 587 588 |
# File 'lib/amq/client/async/adapter.rb', line 580 def send_heartbeat if tcp_connection_established? && !@handling_skipped_hearbeats && @last_server_heartbeat if @last_server_heartbeat < (Time.now - (self.heartbeat_interval * 2)) && !reconnecting? logger.error "[amqp] Detected missing server heartbeats" self.handle_skipped_hearbeats end send_frame(Protocol::HeartbeatFrame) end end |
#send_preamble ⇒ Object
This must be implemented by all AMQP clients.
Sends AMQ protocol header (also known as preamble).
239 240 241 |
# File 'lib/amq/client/async/adapter.rb', line 239 def send_preamble self.send_raw(AMQ::Protocol::PREAMBLE) end |
#send_raw(data) ⇒ Object
This must be implemented by all AMQP clients.
Sends opaque data to AMQ broker over active connection.
485 486 487 |
# File 'lib/amq/client/async/adapter.rb', line 485 def send_raw(data) raise NotImplementedError end |
#start_automatic_recovery ⇒ Object
Performs recovery of channels that are in the automatic recovery mode. “before recovery” callbacks are run immediately, “after recovery” callbacks are run after AMQP connection is re-established and auto recovery is performed (using #auto_recover).
Use this method if you want to run automatic recovery process after handling a connection-level exception, for example, 320 CONNECTION_FORCED (used by RabbitMQ when it is shut down gracefully).
445 446 447 448 449 450 451 452 453 454 |
# File 'lib/amq/client/async/adapter.rb', line 445 def start_automatic_recovery self.run_before_recovery_callbacks self.register_connection_callback do # always run automatic recovery, because it is per-channel # and connection has to start it. Channels that did not opt-in for # autorecovery won't be selected. MK. self.auto_recover self.run_after_recovery_callbacks end end |
#tcp_connection_failed ⇒ Object
Called when initial TCP connection fails.
303 304 305 306 307 |
# File 'lib/amq/client/async/adapter.rb', line 303 def tcp_connection_failed @recovered = false @on_tcp_connection_failure.call(@settings) if @on_tcp_connection_failure end |
#tcp_connection_lost ⇒ Object
Called when previously established TCP connection fails.
311 312 313 314 315 316 |
# File 'lib/amq/client/async/adapter.rb', line 311 def tcp_connection_lost @recovered = false @on_tcp_connection_loss.call(self, @settings) if @on_tcp_connection_loss self.handle_connection_interruption end |
#vhost ⇒ String
vhost this connection uses. Default is “/”, a historically estabilished convention of RabbitMQ and amqp gem.
293 294 295 |
# File 'lib/amq/client/async/adapter.rb', line 293 def vhost @settings.fetch(:vhost, "/") end |