Class: Smith::Messaging::Foo

Inherits:
Object
  • Object
show all
Includes:
Logger
Defined in:
lib/smith/messaging/receiver.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logger

included

Constructor Details

#initialize(metadata, data, opts = {}, requeue_opts, &blk) ⇒ Foo

Returns a new instance of Foo.

Raises:



207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
# File 'lib/smith/messaging/receiver.rb', line 207

def initialize(, data, opts={}, requeue_opts, &blk)
  @metadata = 
  @reply_queue = opts[:reply_queue]
  @requeue_opts = requeue_opts

  @acl_type_cache = AclTypeCache.instance

  @time = Time.now

  # TODO add some better error checking/diagnostics.
  clazz = @acl_type_cache.get_by_hash(.type)
  raise ACL::UnknownError, "Unknown ACL: #{.type}" if clazz.nil?

  @message = clazz.new.parse_from_string(data)

  if opts[:threading]
    EM.defer do
      blk.call(@message, self)
      ack if opts[:auto_ack]
    end
  else
    blk.call(@message, self)
    ack if opts[:auto_ack]
  end
end

Instance Attribute Details

#metadataObject

Returns the value of attribute metadata.



205
206
207
# File 'lib/smith/messaging/receiver.rb', line 205

def 
  @metadata
end

Instance Method Details

#ack(multiple = false) ⇒ Object

acknowledge the message.



253
254
255
# File 'lib/smith/messaging/receiver.rb', line 253

def ack(multiple=false)
  @metadata.ack(multiple)
end

#correlation_idObject

the correlation_id



263
264
265
# File 'lib/smith/messaging/receiver.rb', line 263

def correlation_id
  @metadata.correlation_id
end

#reject(opts = {}) ⇒ Object

reject the message. Optionally requeuing it.



258
259
260
# File 'lib/smith/messaging/receiver.rb', line 258

def reject(opts={})
  @metadata.reject(opts)
end

#reply(acl = nil, &blk) ⇒ Object

Send a message to the reply_to queue as specified in the message header.

Raises:

  • (ArgumentError)


234
235
236
237
238
239
240
241
242
243
# File 'lib/smith/messaging/receiver.rb', line 234

def reply(acl=nil, &blk)
  raise ArgumentError, "you cannot supply an ACL and a blcok." if acl && blk
  raise ArgumentError, "you must supply either an ACL or a blcok." if acl.nil? && blk.nil?

  if @metadata.reply_to
    @reply_queue.publish((blk) ? blk.call : acl, :correlation_id => @metadata.message_id)
  else
    logger.error { "You are replying to a message that has no reply_to: #{@metadata.exchange}." }
  end
end

#requeueObject

Requeue the current mesage on the current queue. A requeue number is added to the message header which is used to ensure the correct number of requeues.



248
249
250
# File 'lib/smith/messaging/receiver.rb', line 248

def requeue
  Requeue.new(@message, @metadata, @requeue_opts).requeue
end