Module: Mumukit::Nuntius::Consumer
- Defined in:
- lib/mumukit/nuntius/consumer.rb
Class Method Summary collapse
- .handle_message(channel, delivery_info, properties, body, &block) ⇒ Object
- .negligent_start!(queue_name, &block) ⇒ Object
- .parse_body(body) ⇒ Object
- .start(queue_name, exchange_name, &block) ⇒ Object
- .subscribe(queue, channel, &block) ⇒ Object
Class Method Details
.handle_message(channel, delivery_info, properties, body, &block) ⇒ Object
41 42 43 44 45 46 47 |
# File 'lib/mumukit/nuntius/consumer.rb', line 41 def (channel, delivery_info, properties, body, &block) block.call delivery_info, properties, parse_body(body) channel.ack delivery_info.delivery_tag rescue => e Mumukit::Nuntius::Logger.warn "Failed to read body: #{e.} \n #{e.backtrace}" channel.nack delivery_info.delivery_tag, false, true end |
.negligent_start!(queue_name, &block) ⇒ Object
22 23 24 25 26 27 28 29 30 |
# File 'lib/mumukit/nuntius/consumer.rb', line 22 def negligent_start!(queue_name, &block) start queue_name, queue_name do |_delivery_info, _properties, body| begin block.call(body) rescue => e Mumukit::Nuntius::Logger.error "#{queue_name} item couldn't be processed #{e}. body was: #{body}" end end end |
.parse_body(body) ⇒ Object
49 50 51 |
# File 'lib/mumukit/nuntius/consumer.rb', line 49 def parse_body(body) JSON.parse(body).with_indifferent_access end |
.start(queue_name, exchange_name, &block) ⇒ Object
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
# File 'lib/mumukit/nuntius/consumer.rb', line 5 def start(queue_name, exchange_name, &block) Mumukit::Nuntius::Logger.info "Attaching to queue #{queue_name}" Mumukit::Nuntius::Connection.establish_connection channel, exchange = Mumukit::Nuntius::Connection.start_channel(exchange_name) queue = channel.queue(queue_name, durable: true) queue.bind(exchange) channel.prefetch(1) begin subscribe queue, channel, &block rescue Interrupt => _ Mumukit::Nuntius::Logger.info "Leaving queue #{queue_name}" ensure channel.close end end |
.subscribe(queue, channel, &block) ⇒ Object
32 33 34 35 36 37 38 39 |
# File 'lib/mumukit/nuntius/consumer.rb', line 32 def subscribe(queue, channel, &block) Mumukit::Nuntius::Logger.debug "Subscribed to queue #{queue}" queue.subscribe(manual_ack: true, block: true) do |delivery_info, properties, body| Mumukit::Nuntius::Logger.debug "Processing message #{body}" channel, delivery_info, properties, body, &block end end |