Module: AMQP::Client

Includes:
EM::Deferrable
Defined in:
lib/amqp/client.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.connect(opts = {}) ⇒ Object



180
181
182
183
# File 'lib/amqp/client.rb', line 180

def self.connect opts = {}
  opts = AMQP.settings.merge(opts)
  EM.connect opts[:host], opts[:port], self, opts
end

Instance Method Details

#add_channel(mq) ⇒ Object



100
101
102
103
104
105
# File 'lib/amqp/client.rb', line 100

def add_channel mq
  (@_channel_mutex ||= Mutex.new).synchronize do
    channels[ key = (channels.keys.max || 0) + 1 ] = mq
    key
  end
end

#channelsObject



107
108
109
# File 'lib/amqp/client.rb', line 107

def channels
  @channels ||= {}
end

#close(&on_disconnect) ⇒ Object



135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# File 'lib/amqp/client.rb', line 135

def close &on_disconnect
  if on_disconnect
    @closing = true
    @on_disconnect = proc{
      on_disconnect.call
      @closing = false
    }
  end

  callback{ |c|
    if c.channels.any?
      c.channels.each do |ch, mq|
        mq.close
      end
    else
      send Protocol::Connection::Close.new(:reply_code => 200,
                                           :reply_text => 'Goodbye',
                                           :class_id => 0,
                                           :method_id => 0)
    end
  }
end

#connected?Boolean

Returns:

  • (Boolean)


90
91
92
# File 'lib/amqp/client.rb', line 90

def connected?
  @connected
end

#connection_completedObject



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/amqp/client.rb', line 73

def connection_completed
  start_tls if @settings[:ssl]
  log 'connected'
  # @on_disconnect = proc{ raise Error, 'Disconnected from server' }
  unless @closing
    @on_disconnect = method(:disconnected)
    @reconnecting = false
  end

  @connected = true
  @connection_status.call(:connected) if @connection_status

  @buf = Buffer.new
  send_data HEADER
  send_data [1, 1, VERSION_MAJOR, VERSION_MINOR].pack('C4')
end

#connection_status(&blk) ⇒ Object



185
186
187
# File 'lib/amqp/client.rb', line 185

def connection_status &blk
  @connection_status = blk
end

#initialize(opts = {}) ⇒ Object



61
62
63
64
65
66
67
68
69
70
71
# File 'lib/amqp/client.rb', line 61

def initialize opts = {}
  @settings = opts
  extend AMQP.client

  @on_disconnect ||= proc{ raise Error, "Could not connect to server #{opts[:host]}:#{opts[:port]}" }

  timeout @settings[:timeout] if @settings[:timeout]
  errback{ @on_disconnect.call } unless @reconnecting

  @connected = false
end

#process_frame(frame) ⇒ Object



121
122
123
124
# File 'lib/amqp/client.rb', line 121

def process_frame frame
  # this is a stub meant to be
  # replaced by the module passed into initialize
end

#receive_data(data) ⇒ Object



111
112
113
114
115
116
117
118
119
# File 'lib/amqp/client.rb', line 111

def receive_data data
  # log 'receive_data', data
  @buf << data

  while frame = Frame.parse(@buf)
    log 'receive', frame
    process_frame frame
  end
end

#reconnect(force = false) ⇒ Object



158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/amqp/client.rb', line 158

def reconnect force = false
  if @reconnecting and not force
    # wait 1 second after first reconnect attempt, in between each subsequent attempt
    EM.add_timer(1){ reconnect(true) }
    return
  end

  unless @reconnecting
    @reconnecting = true

    @deferred_status = nil
    initialize(@settings)

    mqs = @channels
    @channels = {}
    mqs.each{ |_,mq| mq.reset } if mqs
  end

  log 'reconnecting'
  EM.reconnect @settings[:host], @settings[:port], self
end

#send(data, opts = {}) ⇒ Object



126
127
128
129
130
131
132
133
# File 'lib/amqp/client.rb', line 126

def send data, opts = {}
  channel = opts[:channel] ||= 0
  data = data.to_frame(channel) unless data.is_a? Frame
  data.channel = channel

  log 'send', data
  send_data data.to_s
end

#unbindObject



94
95
96
97
98
# File 'lib/amqp/client.rb', line 94

def unbind
  log 'disconnected'
  @connected = false
  EM.next_tick{ @on_disconnect.call }
end