Class: AMQ::Client::Async::EventMachineClient
- Inherits:
-
EM::Connection
- Object
- EM::Connection
- AMQ::Client::Async::EventMachineClient
- Includes:
- Adapter
- Defined in:
- lib/amq/client/async/adapters/event_machine.rb
Constant Summary collapse
- Deferrable =
Backwards compatibility with 0.7.0.a25. MK.
EventMachine::DefaultDeferrable
Constants included from Openable
Instance Attribute Summary
Attributes included from Openable
Connection operations collapse
-
.connect(settings = {}, &block) ⇒ Object
Initiates connection to AMQP broker.
-
#periodically_reconnect(period = 5) ⇒ Object
Periodically try to reconnect.
-
#reconnect(force = false, period = 5) ⇒ Object
Reconnect after a period of wait.
-
#reconnect_to(settings, period = 5) ⇒ Object
Similar to #reconnect, but uses different connection settings.
Instance Method Summary collapse
-
#authenticating? ⇒ Boolean
Whether we are in authentication state (after TCP connection was estabilished but before broker authenticated us).
-
#connection_successful ⇒ Object
Called by AMQ::Client::Connection after we receive connection.open-ok.
-
#disconnection_successful ⇒ Object
Called by AMQ::Client::Connection after we receive connection.close-ok.
-
#establish_connection(settings) ⇒ Object
For EventMachine adapter, this is a no-op.
-
#handle_skipped_hearbeats ⇒ Object
Called when time since last server heartbeat received is greater or equal to the heartbeat interval set via :heartbeat_interval option on connection.
-
#initialize(*args) ⇒ EventMachineClient
constructor
A new instance of EventMachineClient.
-
#on_closed(&block) ⇒ Object
(also: #on_disconnection)
Defines a callback that will be run when broker confirms connection termination (client receives connection.close-ok).
-
#on_open(&block) ⇒ Object
(also: #on_connection)
Defines a callback that will be executed when AMQP connection is considered open: client and broker has agreed on max channel identifier and maximum allowed frame size and authentication succeeds.
- #reset ⇒ Object protected
-
#tcp_connection_established? ⇒ Boolean
IS TCP connection estabilished and currently active?.
- #upgrade_to_tls_if_necessary ⇒ Object protected
Methods included from Adapter
#auth_mechanism_adapter, #auto_recover, #auto_recovering?, #before_recovery, #content_complete?, #disconnect, #encode_credentials, #frameset_complete?, #get_next_frame, #handle_close, #handle_close_ok, #handle_open_ok, #handle_start, #handle_tune, #handshake, #heartbeat_interval, #heartbeats_enabled?, #negotiate_heartbeat_value, #on_connection_interruption, #on_error, #on_possible_authentication_failure, #on_recovery, #on_skipped_heartbeats, #on_tcp_connection_failure, #on_tcp_connection_loss, #open, #receive_frame, #receive_frameset, #reconnecting?, #reset_state!, #send_frame, #send_frameset, #send_heartbeat, #send_preamble, #start_automatic_recovery, #tcp_connection_failed, #tcp_connection_lost, #vhost
Methods included from RegisterEntityMixin
Methods included from Callbacks
#clear_callbacks, #define_callback, #exec_callback, #exec_callback_once, #exec_callback_once_yielding_self, #exec_callback_yielding_self, #has_callback?, #prepend_callback, #redefine_callback
Methods included from Openable
#closed!, #closed?, #closing!, #closing?, #opened!, #opened?, #opening!, #opening?
Constructor Details
#initialize(*args) ⇒ EventMachineClient
Returns a new instance of EventMachineClient.
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 |
# File 'lib/amq/client/async/adapters/event_machine.rb', line 140 def initialize(*args) super(*args) self.logger = self.class.logger # channel => collected frames. MK. @frames = Hash.new { Array.new } @channels = Hash.new @callbacks = Hash.new opening! # track TCP connection state, used to detect initial TCP connection failures. @tcp_connection_established = false @tcp_connection_failed = false @intentionally_closing_connection = false # EventMachine::Connection's and Adapter's constructors arity # make it easier to use *args. MK. @settings = Settings.configure(args.first) @on_tcp_connection_failure = @settings[:on_tcp_connection_failure] || Proc.new { |settings| raise self.class.tcp_connection_failure_exception_class.new(settings) } @on_possible_authentication_failure = @settings[:on_possible_authentication_failure] || Proc.new { |settings| raise self.class.authentication_failure_exception_class.new(settings) } @mechanism = @settings.fetch(:auth_mechanism, "PLAIN") @locale = @settings.fetch(:locale, "en_GB") @client_properties = Settings.client_properties.merge(@settings.fetch(:client_properties, Hash.new)) @auto_recovery = (!!@settings[:auto_recovery]) self.reset self.set_pending_connect_timeout((@settings[:timeout] || 3).to_f) unless defined?(JRUBY_VERSION) end |
Class Method Details
.connect(settings = {}, &block) ⇒ Object
Initiates connection to AMQP broker. If callback is given, runs it when (and if) AMQP connection succeeds.
41 42 43 44 45 46 47 48 |
# File 'lib/amq/client/async/adapters/event_machine.rb', line 41 def self.connect(settings = {}, &block) @settings = Settings.configure(settings) instance = EventMachine.connect(@settings[:host], @settings[:port], self, @settings) instance.register_connection_callback(&block) instance end |
Instance Method Details
#authenticating? ⇒ Boolean
Whether we are in authentication state (after TCP connection was estabilished but before broker authenticated us).
195 196 197 |
# File 'lib/amq/client/async/adapters/event_machine.rb', line 195 def authenticating? @authenticating end |
#connection_successful ⇒ Object
Called by AMQ::Client::Connection after we receive connection.open-ok.
328 329 330 331 332 333 |
# File 'lib/amq/client/async/adapters/event_machine.rb', line 328 def connection_successful @authenticating = false opened! @connection_deferrable.succeed end |
#disconnection_successful ⇒ Object
Called by AMQ::Client::Connection after we receive connection.close-ok.
339 340 341 342 343 344 345 346 |
# File 'lib/amq/client/async/adapters/event_machine.rb', line 339 def disconnection_successful @disconnection_deferrable.succeed # true for "after writing buffered data" self.close_connection(true) self.reset closed! end |
#establish_connection(settings) ⇒ Object
For EventMachine adapter, this is a no-op.
181 182 183 184 |
# File 'lib/amq/client/async/adapters/event_machine.rb', line 181 def establish_connection(settings) # Unfortunately there doesn't seem to be any sane way # how to get EventMachine connect to the instance level. end |
#handle_skipped_hearbeats ⇒ Object
Called when time since last server heartbeat received is greater or equal to the heartbeat interval set via :heartbeat_interval option on connection.
352 353 354 355 356 357 358 359 |
# File 'lib/amq/client/async/adapters/event_machine.rb', line 352 def handle_skipped_hearbeats if !@handling_skipped_hearbeats && @tcp_connection_established && !@intentionally_closing_connection @handling_skipped_hearbeats = true self.cancel_heartbeat_sender self.run_skipped_heartbeats_callbacks end end |
#on_closed(&block) ⇒ Object Also known as: on_disconnection
Defines a callback that will be run when broker confirms connection termination (client receives connection.close-ok). You can define more than one callback.
119 120 121 |
# File 'lib/amq/client/async/adapters/event_machine.rb', line 119 def on_closed(&block) @disconnection_deferrable.callback(&block) end |
#on_open(&block) ⇒ Object Also known as: on_connection
Defines a callback that will be executed when AMQP connection is considered open: client and broker has agreed on max channel identifier and maximum allowed frame size and authentication succeeds. You can define more than one callback.
110 111 112 |
# File 'lib/amq/client/async/adapters/event_machine.rb', line 110 def on_open(&block) @connection_deferrable.callback(&block) end |
#periodically_reconnect(period = 5) ⇒ Object
Periodically try to reconnect.
90 91 92 93 94 95 96 97 |
# File 'lib/amq/client/async/adapters/event_machine.rb', line 90 def periodically_reconnect(period = 5) @reconnecting = true self.reset @periodic_reconnection_timer = EventMachine::PeriodicTimer.new(period) { EventMachine.reconnect(@settings[:host], @settings[:port], self) } end |
#reconnect(force = false, period = 5) ⇒ Object
Reconnect after a period of wait.
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/amq/client/async/adapters/event_machine.rb', line 55 def reconnect(force = false, period = 5) if @reconnecting and not force EventMachine::Timer.new(period) { reconnect(true, period) } return end if !@reconnecting @reconnecting = true self.reset end EventMachine.reconnect(@settings[:host], @settings[:port], self) end |
#reconnect_to(settings, period = 5) ⇒ Object
Similar to #reconnect, but uses different connection settings
74 75 76 77 78 79 80 81 82 |
# File 'lib/amq/client/async/adapters/event_machine.rb', line 74 def reconnect_to(settings, period = 5) if !@reconnecting @reconnecting = true self.reset end @settings = Settings.configure(settings) EventMachine.reconnect(@settings[:host], @settings[:port], self) end |
#reset ⇒ Object (protected)
402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 |
# File 'lib/amq/client/async/adapters/event_machine.rb', line 402 def reset @size = 0 @payload = "" @frames = Array.new @chunk_buffer = "" @connection_deferrable = EventMachine::DefaultDeferrable.new @disconnection_deferrable = EventMachine::DefaultDeferrable.new # used to track down whether authentication succeeded. AMQP 0.9.1 dictates # that on authentication failure broker must close TCP connection without sending # any more data. This is why we need to explicitly track whether we are past # authentication stage to signal possible authentication failures. @authenticating = false end |
#tcp_connection_established? ⇒ Boolean
IS TCP connection estabilished and currently active?
202 203 204 |
# File 'lib/amq/client/async/adapters/event_machine.rb', line 202 def tcp_connection_established? @tcp_connection_established end |
#upgrade_to_tls_if_necessary ⇒ Object (protected)
418 419 420 421 422 423 424 425 426 |
# File 'lib/amq/client/async/adapters/event_machine.rb', line 418 def upgrade_to_tls_if_necessary = @settings[:ssl] if .is_a?(Hash) start_tls() elsif start_tls end end |