Module: Vx::Common::AMQP::Consumer::Subscribe
- Included in:
- ClassMethods
- Defined in:
- lib/vx/common/amqp/consumer/subscribe.rb
Instance Method Summary collapse
Instance Method Details
#pop(q) ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/vx/common/amqp/consumer/subscribe.rb', line 32 def pop(q) unpacked = nil delivery_info, properties, payload = q.pop(ack: ack) instrumentation = { properties: properties, consumer_id: consumer_id, consumer: consumer_name } if payload instrument("unpacking.consumer.amqp", instrumentation) do unpacked = properties, payload end end [unpacked, delivery_info, properties] end |
#start ⇒ Object
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/vx/common/amqp/consumer/subscribe.rb', line 14 def start rs = nil session.open session.with_channel do x = declare_exchange q = declare_queue instrumentation = { exchange: x.name, queue: q.name, consumer: consumer_id } instrument("start.consumer.amqp", instrumentation) q.bind(x, ) rs = yield(x, q) if block_given? instrument("shutdown.consumer.amqp", instrumentation) end rs end |
#subscribe ⇒ Object
8 9 10 11 12 |
# File 'lib/vx/common/amqp/consumer/subscribe.rb', line 8 def subscribe start do |x, q| subscription_loop q end end |