Module: AMQP::Client

Includes:
EM::Deferrable
Defined in:
lib/amqp/client.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.connect(opts = {}) ⇒ Object



240
241
242
243
244
245
246
247
248
249
250
251
252
253
# File 'lib/amqp/client.rb', line 240

def self.connect opts = {}
  opts = AMQP.settings.merge(opts)
  max_retry = ((opts[:fallback_servers] || []).size + 1) * 3
  begin
    (try_host, try_port) = determine_reconnect_server(opts)
    result = EM.connect try_host, try_port, self, opts
    connection_succeeded!
    return result
  rescue RuntimeError => e
    STDERR.puts "'#{e.message}' on connect to #{try_host}:#{try_port}"
    retry if e.message == "no connection" && @retry_count < max_retry
    raise e
  end
end

.connection_succeeded!Object



236
237
238
# File 'lib/amqp/client.rb', line 236

def self.connection_succeeded!
  @retry_count = 0
end

.determine_reconnect_server(opts) ⇒ Object



215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# File 'lib/amqp/client.rb', line 215

def self.determine_reconnect_server(opts)
  try_host = opts[:host]
  try_port = opts[:port]
  @retry_count ||= 0
  if opts[:max_retry] && @retry_count >= opts[:max_retry]
    raise "max_retry (#{@retry_count}) reached, disconnecting"
  end
  if srv_list = opts[:fallback_servers]
    @server_to_select ||= 0
    idex = @server_to_select % (srv_list.size + 1)
    if idex != 0
      try = srv_list[idex - 1]
      try_host = try[:host] || AMQP.settings[:host]
      try_port = try[:port] || AMQP.settings[:port]
    end
    @server_to_select += 1
  end      
  @retry_count += 1
  [try_host, try_port]
end

Instance Method Details

#add_channel(mq) ⇒ Object



116
117
118
119
120
121
# File 'lib/amqp/client.rb', line 116

def add_channel mq
  (@_channel_mutex ||= Mutex.new).synchronize do
    channels[ key = (channels.keys.max || 0) + 1 ] = mq
    key
  end
end

#channelsObject



123
124
125
# File 'lib/amqp/client.rb', line 123

def channels
  @channels ||= {}
end

#close(&on_disconnect) ⇒ Object

:stopdoc: def send_data data

log 'send_data', data
super

end :startdoc:



158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/amqp/client.rb', line 158

def close &on_disconnect
  if on_disconnect
    @closing = true
    @on_disconnect = proc{
      on_disconnect.call
      @closing = false
    }
  end

  callback{ |c|
    if c.channels.any?
      c.channels.each do |ch, mq|
        mq.close
      end
    else
      send Protocol::Connection::Close.new(:reply_code => 200,
                                           :reply_text => 'Goodbye',
                                           :class_id => 0,
                                           :method_id => 0)
    end
  }
end

#connected?Boolean

Returns:

  • (Boolean)


106
107
108
# File 'lib/amqp/client.rb', line 106

def connected?
  @connected
end

#connection_completedObject



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/amqp/client.rb', line 83

def connection_completed
  start_tls if @settings[:ssl]
  log 'connected'
  # @on_disconnect = proc{ raise Error, 'Disconnected from server' }
  unless @closing
    Client.connection_succeeded!
    @on_disconnect = method(:disconnected)
  end

  @connected = true
  @connection_status.call(:connected) if @connection_status

  @buf = Buffer.new
  send_data HEADER
  send_data [1, 1, VERSION_MAJOR, VERSION_MINOR].pack('C4')
  
  if @settings[:keepalive]
    EM.add_periodic_timer(@settings[:keepalive]) do
      send Frame::Heartbeat.new
    end
  end
end

#connection_status(&blk) ⇒ Object



255
256
257
# File 'lib/amqp/client.rb', line 255

def connection_status &blk
  @connection_status = blk
end

#initialize(opts = {}) ⇒ Object



71
72
73
74
75
76
77
78
79
80
81
# File 'lib/amqp/client.rb', line 71

def initialize opts = {}
  @settings = opts
  extend AMQP.client

  @on_disconnect ||= method(:disconnected)

  timeout @settings[:timeout] if @settings[:timeout]
  errback{ @on_disconnect.call } unless @reconnecting

  @connected = false
end

#process_frame(frame) ⇒ Object



137
138
139
140
# File 'lib/amqp/client.rb', line 137

def process_frame frame
  # this is a stub meant to be
  # replaced by the module passed into initialize
end

#receive_data(data) ⇒ Object



127
128
129
130
131
132
133
134
135
# File 'lib/amqp/client.rb', line 127

def receive_data data
  # log 'receive_data', data
  @buf << data

  while frame = Frame.parse(@buf)
    log 'receive', frame
    process_frame frame
  end
end

#reconnect(force = false, use_host = nil, use_port = nil) ⇒ Object



181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
# File 'lib/amqp/client.rb', line 181

def reconnect force = false, use_host = nil, use_port = nil
  if @reconnecting and not force
    # wait 1 second after first reconnect attempt, in between each subsequent attempt
    EM.add_timer(@settings[:reconnect_timer] || 1){ reconnect(true) }
    return
  end

  unless @reconnecting
    @reconnecting = true

    @deferred_status = nil
    initialize(@settings)

    mqs = @channels
    @channels = {}
    mqs.each{ |_,mq| mq.reset } if mqs
  end

  log 'reconnecting'
  begin
    if use_host && use_port
      (try_host, try_port) = [use_host, use_port]
    else
      (try_host, try_port) = Client.determine_reconnect_server(@settings)
      STDERR.puts "Reconnecting to #{try_host}:#{try_port}"          
    end
    EM.reconnect try_host, try_port, self
  rescue RuntimeError => e
    STDERR.puts "'#{e.message}' on reconnect to #{try_host}:#{try_port}"
    return reconnect if e.message == "no connection"
    raise e
  end
end

#send(data, opts = {}) ⇒ Object



142
143
144
145
146
147
148
149
# File 'lib/amqp/client.rb', line 142

def send data, opts = {}
  channel = opts[:channel] ||= 0
  data = data.to_frame(channel) unless data.is_a? Frame
  data.channel = channel

  log 'send', data
  send_data data.to_s
end

#unbindObject



110
111
112
113
114
# File 'lib/amqp/client.rb', line 110

def unbind
  log 'disconnected'
  @connected = false
  EM.next_tick{ @on_disconnect.call }
end