Module: Rabbitek::Consumer

Defined in:
lib/rabbitek/server/consumer.rb

Overview

Consumer helpers

Defined Under Namespace

Modules: ClassMethods

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#channelObject (readonly)

Returns the value of attribute channel.



11
12
13
# File 'lib/rabbitek/server/consumer.rb', line 11

def channel
  @channel
end

#queueObject (readonly)

Returns the value of attribute queue.



11
12
13
# File 'lib/rabbitek/server/consumer.rb', line 11

def queue
  @queue
end

#retry_or_delayed_exchangeObject (readonly)

Returns the value of attribute retry_or_delayed_exchange.



11
12
13
# File 'lib/rabbitek/server/consumer.rb', line 11

def retry_or_delayed_exchange
  @retry_or_delayed_exchange
end

#retry_or_delayed_queueObject (readonly)

Returns the value of attribute retry_or_delayed_queue.



11
12
13
# File 'lib/rabbitek/server/consumer.rb', line 11

def retry_or_delayed_queue
  @retry_or_delayed_queue
end

Class Method Details

.included(base) ⇒ Object



7
8
9
# File 'lib/rabbitek/server/consumer.rb', line 7

def self.included(base)
  base.extend(ClassMethods)
end

Instance Method Details

#ack!(delivery_info, multiple = false) ⇒ Object



20
21
22
# File 'lib/rabbitek/server/consumer.rb', line 20

def ack!(delivery_info, multiple = false)
  channel.ack(delivery_info.delivery_tag, multiple)
end

#initialize(channel, queue, retry_or_delayed_queue, retry_or_delayed_exchange) ⇒ Object



13
14
15
16
17
18
# File 'lib/rabbitek/server/consumer.rb', line 13

def initialize(channel, queue, retry_or_delayed_queue, retry_or_delayed_exchange)
  @channel = channel
  @queue = queue
  @retry_or_delayed_queue = retry_or_delayed_queue
  @retry_or_delayed_exchange = retry_or_delayed_exchange
end

#jidObject



44
45
46
# File 'lib/rabbitek/server/consumer.rb', line 44

def jid
  Thread.current[:rabbit_context][:job_id]
end

#loggerObject



28
29
30
# File 'lib/rabbitek/server/consumer.rb', line 28

def logger
  Rabbitek.logger
end

#nack!(delivery_info, multiple = false, requeue = true) ⇒ Object



24
25
26
# File 'lib/rabbitek/server/consumer.rb', line 24

def nack!(delivery_info, multiple = false, requeue = true)
  channel.nack(delivery_info.delivery_tag, multiple, requeue)
end

#optsObject



55
56
57
# File 'lib/rabbitek/server/consumer.rb', line 55

def opts
  self.class.opts
end

#parse_payload(payload) ⇒ Object



32
33
34
# File 'lib/rabbitek/server/consumer.rb', line 32

def parse_payload(payload)
  Utils::Oj.load(payload)
end

#perform(_message) ⇒ Object

Raises:

  • (NotImplementedError)


36
37
38
# File 'lib/rabbitek/server/consumer.rb', line 36

def perform(_message)
  raise NotImplementedError
end

#pop_message_manuallyObject



48
49
50
51
52
53
# File 'lib/rabbitek/server/consumer.rb', line 48

def pop_message_manually
  delivery_info, properties, payload = queue.pop(manual_ack: true)
  return nil unless payload

  Message.new(delivery_info: delivery_info, properties: properties, payload: payload)
end

#set_contextObject



40
41
42
# File 'lib/rabbitek/server/consumer.rb', line 40

def set_context
  Thread.current[:rabbit_context] = { consumer: self.class.name, queue: @queue.name, job_id: SecureRandom.uuid }
end