Class: AMQ::Client::Async::CoolioClient

Inherits:
Object
  • Object
show all
Includes:
Adapter
Defined in:
lib/amq/client/async/adapters/coolio.rb

Overview

CoolioClient is a drop-in replacement for EventMachineClient, if you prefer cool.io style.

Constant Summary

Constants included from Openable

Openable::VALUES

Instance Attribute Summary collapse

Attributes included from Openable

#status

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Adapter

#auth_mechanism_adapter, #auto_recover, #auto_recovering?, #before_recovery, #content_complete?, #disconnect, #encode_credentials, #frameset_complete?, #get_next_frame, #handle_close, #handle_close_ok, #handle_open_ok, #handle_start, #handle_tune, #handshake, #heartbeat_interval, #heartbeats_enabled?, #negotiate_heartbeat_value, #on_connection_interruption, #on_error, #on_possible_authentication_failure, #on_recovery, #on_skipped_heartbeats, #on_tcp_connection_loss, #open, #receive_frame, #receive_frameset, #reconnecting?, #reset_state!, #send_frame, #send_frameset, #send_heartbeat, #send_preamble, #start_automatic_recovery, #tcp_connection_failed, #tcp_connection_lost, #vhost

Methods included from RegisterEntityMixin

#register_entity

Methods included from Callbacks

#clear_callbacks, #define_callback, #exec_callback, #exec_callback_once, #exec_callback_once_yielding_self, #exec_callback_yielding_self, #has_callback?, #prepend_callback, #redefine_callback

Methods included from Openable

#closed!, #closed?, #closing!, #closing?, #opened!, #opened?, #opening!, #opening?

Constructor Details

#initializeCoolioClient

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Performs basic initialization. Do not use this method directly, use CoolioClient.connect instead

See Also:

  • AMQ::Client::Adapter::ClassMethods#connect


142
143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/amq/client/async/adapters/coolio.rb', line 142

def initialize
  # Be careful with default values for #ruby hashes: h = Hash.new(Array.new); h[:key] ||= 1
  # won't assign anything to :key. MK.
  @callbacks    = Hash.new

  self.logger   = self.class.logger

  # channel => collected frames. MK.
  @frames            = Hash.new { Array.new }
  @channels          = Hash.new

  @mechanism         = "PLAIN"
end

Instance Attribute Details

#callbacksObject

Hash with available callbacks



98
99
100
# File 'lib/amq/client/async/adapters/coolio.rb', line 98

def callbacks
  @callbacks
end

Class Method Details

.tcp_connection_failure_exception_classObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns class used for tcp connection failures.



274
275
276
# File 'lib/amq/client/async/adapters/coolio.rb', line 274

def self.tcp_connection_failure_exception_class
  AMQ::Client::TCPConnectionFailed
end

Instance Method Details

#close_connectionObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Closes the socket.



267
268
269
# File 'lib/amq/client/async/adapters/coolio.rb', line 267

def close_connection
  @socket.close
end

#connection_successfulObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Called by AMQ::Client::Connection after we receive connection.open-ok.



181
182
183
184
185
186
# File 'lib/amq/client/async/adapters/coolio.rb', line 181

def connection_successful
  @authenticating = false
  opened!

  exec_callback_yielding_self(:connect)
end

#disconnection_successfulObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Called by AMQ::Client::Connection after we receive connection.close-ok.



192
193
194
195
196
# File 'lib/amq/client/async/adapters/coolio.rb', line 192

def disconnection_successful
  exec_callback_yielding_self(:disconnect)
  close_connection
  closed!
end

#establish_connection(settings) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Creates a socket and attaches it to cool.io default loop.

Called from CoolioClient.connect

Parameters:

  • connection (Hash)

    settings

See Also:

  • AMQ::Client::Adapter::ClassMethods#connect


107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/amq/client/async/adapters/coolio.rb', line 107

def establish_connection(settings)
  @settings     = Settings.configure(settings)

  socket = Socket.connect(self, @settings[:host], @settings[:port])
  socket.attach(Cool.io::Loop.default)
  self.socket = socket


  @on_tcp_connection_failure          = @settings[:on_tcp_connection_failure] || Proc.new { |settings|
    raise self.class.tcp_connection_failure_exception_class.new(settings)
  }
  @on_possible_authentication_failure = @settings[:on_possible_authentication_failure] || Proc.new { |settings|
    raise self.class.authentication_failure_exception_class.new(settings)
  }

  @locale            = @settings.fetch(:locale, "en_GB")
  @client_properties = Settings.client_properties.merge(@settings.fetch(:client_properties, Hash.new))

  @auto_recovery     = (!!@settings[:auto_recovery])

  socket
end

#handle_skipped_hearbeatsObject



282
283
284
# File 'lib/amq/client/async/adapters/coolio.rb', line 282

def handle_skipped_hearbeats
  # TODO
end

#initialize_heartbeat_senderObject

self.tcp_connection_failure_exception_class



278
279
280
# File 'lib/amq/client/async/adapters/coolio.rb', line 278

def initialize_heartbeat_sender
  # TODO
end

#on_closed(&block) ⇒ Object Also known as: on_disconnection

Sets a callback for disconnection (as in client-side disconnection)



167
168
169
# File 'lib/amq/client/async/adapters/coolio.rb', line 167

def on_closed(&block)
  define_callback :disconnect, &block
end

#on_open(&block) ⇒ Object Also known as: on_connection

Sets a callback for successful connection (after we receive open-ok)



159
160
161
# File 'lib/amq/client/async/adapters/coolio.rb', line 159

def on_open(&block)
  define_callback :connect, &block
end

#on_tcp_connection_failure(&block) ⇒ Object

Sets a callback for tcp connection failure (as in can’t make initial connection)



173
174
175
# File 'lib/amq/client/async/adapters/coolio.rb', line 173

def on_tcp_connection_failure(&block)
  define_callback :tcp_connection_failure, &block
end

#post_initObject (protected)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
# File 'lib/amq/client/async/adapters/coolio.rb', line 290

def post_init
  if @had_successfully_connected_before
    @recovered = true

    self.exec_callback_yielding_self(:before_recovery, @settings)

    self.register_connection_callback do
      self.auto_recover
      self.exec_callback_yielding_self(:after_recovery, @settings)
    end
  end

  # now we can set it. MK.
  @had_successfully_connected_before = true
  @reconnecting                      = false
  @handling_skipped_hearbeats        = false

  self.reset
  self.handshake
end

#receive_data(chunk) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The story about the buffering is kinda similar to EventMachine, you keep receiving more than one frame in a single packet.

Parameters:

  • chunk (String)

    with binary data received. It could be one frame, more than one frame or less than one frame.



257
258
259
260
261
262
# File 'lib/amq/client/async/adapters/coolio.rb', line 257

def receive_data(chunk)
  @chunk_buffer << chunk
  while frame = get_next_frame
    receive_frame(AMQ::Client::Framing::String::Frame.decode(frame))
  end
end

#register_connection_callback(&block) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Registers on_open callback

See Also:



133
134
135
# File 'lib/amq/client/async/adapters/coolio.rb', line 133

def register_connection_callback(&block)
  self.on_open(&block)
end

#resetObject (protected)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



312
313
314
315
# File 'lib/amq/client/async/adapters/coolio.rb', line 312

def reset
  @chunk_buffer = ""
  @frames       = Array.new
end

#send_raw(data) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Sends raw data through the socket

Parameters:

  • binary (String)

    data



246
247
248
# File 'lib/amq/client/async/adapters/coolio.rb', line 246

def send_raw(data)
  socket.send_raw data
end

#socket_connectedObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Called when socket is connected but before handshake is done



203
204
205
# File 'lib/amq/client/async/adapters/coolio.rb', line 203

def socket_connected
  post_init
end

#socket_disconnectedObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Called after socket is closed



210
211
# File 'lib/amq/client/async/adapters/coolio.rb', line 210

def socket_disconnected
end