Class: Smith::Messaging::Foo
- Inherits:
-
Object
- Object
- Smith::Messaging::Foo
- Includes:
- Logger
- Defined in:
- lib/smith/messaging/receiver.rb
Instance Attribute Summary collapse
-
#metadata ⇒ Object
Returns the value of attribute metadata.
Instance Method Summary collapse
-
#ack(multiple = false) ⇒ Object
acknowledge the message.
-
#correlation_id ⇒ Object
the correlation_id.
-
#initialize(metadata, data, opts = {}, requeue_opts, &blk) ⇒ Foo
constructor
A new instance of Foo.
-
#reject(opts = {}) ⇒ Object
reject the message.
-
#reply(acl = nil, &blk) ⇒ Object
Send a message to the reply_to queue as specified in the message header.
-
#requeue ⇒ Object
Requeue the current mesage on the current queue.
Methods included from Logger
Constructor Details
#initialize(metadata, data, opts = {}, requeue_opts, &blk) ⇒ Foo
Returns a new instance of Foo.
207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 |
# File 'lib/smith/messaging/receiver.rb', line 207 def initialize(, data, opts={}, requeue_opts, &blk) @metadata = @reply_queue = opts[:reply_queue] @requeue_opts = requeue_opts @acl_type_cache = AclTypeCache.instance @time = Time.now # TODO add some better error checking/diagnostics. clazz = @acl_type_cache.get_by_hash(.type) raise ACL::UnknownError, "Unknown ACL: #{.type}" if clazz.nil? @message = clazz.new.parse_from_string(data) if opts[:threading] EM.defer do blk.call(@message, self) ack if opts[:auto_ack] end else blk.call(@message, self) ack if opts[:auto_ack] end end |
Instance Attribute Details
#metadata ⇒ Object
Returns the value of attribute metadata.
205 206 207 |
# File 'lib/smith/messaging/receiver.rb', line 205 def @metadata end |
Instance Method Details
#ack(multiple = false) ⇒ Object
acknowledge the message.
253 254 255 |
# File 'lib/smith/messaging/receiver.rb', line 253 def ack(multiple=false) @metadata.ack(multiple) end |
#correlation_id ⇒ Object
the correlation_id
263 264 265 |
# File 'lib/smith/messaging/receiver.rb', line 263 def correlation_id @metadata.correlation_id end |
#reject(opts = {}) ⇒ Object
reject the message. Optionally requeuing it.
258 259 260 |
# File 'lib/smith/messaging/receiver.rb', line 258 def reject(opts={}) @metadata.reject(opts) end |
#reply(acl = nil, &blk) ⇒ Object
Send a message to the reply_to queue as specified in the message header.
234 235 236 237 238 239 240 241 242 243 |
# File 'lib/smith/messaging/receiver.rb', line 234 def reply(acl=nil, &blk) raise ArgumentError, "you cannot supply an ACL and a blcok." if acl && blk raise ArgumentError, "you must supply either an ACL or a blcok." if acl.nil? && blk.nil? if @metadata.reply_to @reply_queue.publish((blk) ? blk.call : acl, :correlation_id => @metadata.) else logger.error { "You are replying to a message that has no reply_to: #{@metadata.exchange}." } end end |
#requeue ⇒ Object
Requeue the current mesage on the current queue. A requeue number is added to the message header which is used to ensure the correct number of requeues.
248 249 250 |
# File 'lib/smith/messaging/receiver.rb', line 248 def requeue Requeue.new(@message, @metadata, @requeue_opts).requeue end |