Module: Rabbitek::Consumer
- Defined in:
- lib/rabbitek/server/consumer.rb
Overview
Consumer helpers
Defined Under Namespace
Modules: ClassMethods
Instance Attribute Summary collapse
-
#channel ⇒ Object
readonly
Returns the value of attribute channel.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
-
#retry_or_delayed_exchange ⇒ Object
readonly
Returns the value of attribute retry_or_delayed_exchange.
-
#retry_or_delayed_queue ⇒ Object
readonly
Returns the value of attribute retry_or_delayed_queue.
Class Method Summary collapse
Instance Method Summary collapse
- #ack!(delivery_info, multiple = false) ⇒ Object
- #initialize(channel, queue, retry_or_delayed_queue, retry_or_delayed_exchange) ⇒ Object
- #jid ⇒ Object
- #logger ⇒ Object
- #nack!(delivery_info, multiple = false, requeue = true) ⇒ Object
- #opts ⇒ Object
- #parse_payload(payload) ⇒ Object
- #perform(_message) ⇒ Object
- #pop_message_manually ⇒ Object
- #set_context ⇒ Object
Instance Attribute Details
#channel ⇒ Object (readonly)
Returns the value of attribute channel.
11 12 13 |
# File 'lib/rabbitek/server/consumer.rb', line 11 def channel @channel end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
11 12 13 |
# File 'lib/rabbitek/server/consumer.rb', line 11 def queue @queue end |
#retry_or_delayed_exchange ⇒ Object (readonly)
Returns the value of attribute retry_or_delayed_exchange.
11 12 13 |
# File 'lib/rabbitek/server/consumer.rb', line 11 def retry_or_delayed_exchange @retry_or_delayed_exchange end |
#retry_or_delayed_queue ⇒ Object (readonly)
Returns the value of attribute retry_or_delayed_queue.
11 12 13 |
# File 'lib/rabbitek/server/consumer.rb', line 11 def retry_or_delayed_queue @retry_or_delayed_queue end |
Class Method Details
.included(base) ⇒ Object
7 8 9 |
# File 'lib/rabbitek/server/consumer.rb', line 7 def self.included(base) base.extend(ClassMethods) end |
Instance Method Details
#ack!(delivery_info, multiple = false) ⇒ Object
20 21 22 |
# File 'lib/rabbitek/server/consumer.rb', line 20 def ack!(delivery_info, multiple = false) channel.ack(delivery_info.delivery_tag, multiple) end |
#initialize(channel, queue, retry_or_delayed_queue, retry_or_delayed_exchange) ⇒ Object
13 14 15 16 17 18 |
# File 'lib/rabbitek/server/consumer.rb', line 13 def initialize(channel, queue, retry_or_delayed_queue, retry_or_delayed_exchange) @channel = channel @queue = queue @retry_or_delayed_queue = retry_or_delayed_queue @retry_or_delayed_exchange = retry_or_delayed_exchange end |
#jid ⇒ Object
44 45 46 |
# File 'lib/rabbitek/server/consumer.rb', line 44 def jid Thread.current[:rabbit_context][:job_id] end |
#logger ⇒ Object
28 29 30 |
# File 'lib/rabbitek/server/consumer.rb', line 28 def logger Rabbitek.logger end |
#nack!(delivery_info, multiple = false, requeue = true) ⇒ Object
24 25 26 |
# File 'lib/rabbitek/server/consumer.rb', line 24 def nack!(delivery_info, multiple = false, requeue = true) channel.nack(delivery_info.delivery_tag, multiple, requeue) end |
#opts ⇒ Object
55 56 57 |
# File 'lib/rabbitek/server/consumer.rb', line 55 def opts self.class.opts end |
#parse_payload(payload) ⇒ Object
32 33 34 |
# File 'lib/rabbitek/server/consumer.rb', line 32 def parse_payload(payload) Utils::Oj.load(payload) end |
#perform(_message) ⇒ Object
36 37 38 |
# File 'lib/rabbitek/server/consumer.rb', line 36 def perform() raise NotImplementedError end |
#pop_message_manually ⇒ Object
48 49 50 51 52 53 |
# File 'lib/rabbitek/server/consumer.rb', line 48 def delivery_info, properties, payload = queue.pop(manual_ack: true) return nil unless payload Message.new(delivery_info: delivery_info, properties: properties, payload: payload) end |
#set_context ⇒ Object
40 41 42 |
# File 'lib/rabbitek/server/consumer.rb', line 40 def set_context Thread.current[:rabbit_context] = { consumer: self.class.name, queue: @queue.name, job_id: SecureRandom.uuid } end |