Class: RabbitMQClient::Exchange

Inherits:
Object
  • Object
show all
Defined in:
lib/jessica/rabbitmq_client.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, type, channel, opts = {}) ⇒ Exchange

Returns a new instance of Exchange.



215
216
217
218
219
220
221
222
223
224
# File 'lib/jessica/rabbitmq_client.rb', line 215

def initialize(name, type, channel, opts={})
  @name = name
  @type = type
  @channel = channel

  @durable, @auto_delete,@args = opts.values_at(:durable,:auto_delete,:args)
  # Declare a non-passive, auto-delete exchange
  @channel.exchange_declare(@name, type.to_s, durable, @auto_delete || false, @args)
  self
end

Instance Attribute Details

#durableObject (readonly)

Returns the value of attribute durable.



213
214
215
# File 'lib/jessica/rabbitmq_client.rb', line 213

def durable
  @durable
end

#nameObject (readonly)

Returns the value of attribute name.



213
214
215
# File 'lib/jessica/rabbitmq_client.rb', line 213

def name
  @name
end

Instance Method Details

#match_opts(opts) ⇒ Object



226
227
228
229
# File 'lib/jessica/rabbitmq_client.rb', line 226

def match_opts(opts)
  dur,aut,arg = opts.values_at(:durable,:auto_delete,:args)
  @durable == dur && @auto_delete == aut && @args == arg
end

#publish(msg, opts) ⇒ Object

PUBLISH_OPTS = {

  :immediate => false,
  :mandatory => true,
  :persistent => true,
  :routing_key => 'messages.xyz'
}

Parameters:

  • options (Hash)

    a customizable set of options



260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
# File 'lib/jessica/rabbitmq_client.rb', line 260

def publish(msg, opts)
  routing_key = opts[:routing_key] || opts[:key]
  mandatory =   opts[:mandatory]  || false
  immediate =   opts[:immediate]  || false
  persistent =  opts[:persistent] || false
  headers =     opts[:headers]
  _type =       opts[:type] || 'basic'

  properties = case _type
  when 'minimal'
    if persistent
      RabbitMQClient::MessageProperties::MINIMAL_PERSISTENT_BASIC
    else
      RabbitMQClient::MessageProperties::MINIMAL_BASIC
    end
  when 'basic'
    if persistent
      RabbitMQClient::MessageProperties::PERSISTENT_BASIC
    else
      RabbitMQClient::MessageProperties::BASIC
    end
  else
    if persistent
      RabbitMQClient::MessageProperties::PERSISTENT_TEXT_PLAIN
    else
      RabbitMQClient::MessageProperties::TEXT_PLAIN
    end
  end

  properties.headers = headers if headers

  returns, confirms = opts.values_at(:listen_for_returns,:listen_for_confirms)
  time_out = opts[:time_out] || 0.01

  if returns || confirms
    feedback_queue = SizedQueue.new(1)
    @channel.return_listener = ReturnedMessageListener.new(lambda {|reply| feedback_queue << reply}) if returns
    @channel.confirm_listener = ConfirmedMessageListener.new(lambda {|reply| feedback_queue << reply}) if confirms
  end

  internal_publish msg, routing_key, properties, mandatory, immediate

  if returns || confirms
    Thread.new(time_out) do |n|
      sleep n
      feedback_queue << {:kind => "TIME_OUT"}
    end
    ret = feedback_queue.pop
    @channel.return_listener = nil if returns
    @channel.confirm_listener = nil if confirms
    ret
  end
end