Class: Legion::Transport::Queue

Inherits:
CONNECTOR::Queue
  • Object
show all
Includes:
Common
Defined in:
lib/legion/transport/queue.rb

Instance Method Summary collapse

Methods included from Common

#channel, #channel_open?, #close, #close!, #deep_merge, #generate_consumer_tag, #open_channel, #options_builder

Constructor Details

#initialize(queue = queue_name, options = {}) ⇒ Queue

Returns a new instance of Queue.



6
7
8
9
10
11
12
13
14
# File 'lib/legion/transport/queue.rb', line 6

def initialize(queue = queue_name, options = {})
  retries ||= 0
  @options = options
  super(channel, queue, options_builder(default_options, queue_options, options))
rescue Legion::Transport::CONNECTOR::PreconditionFailed
  retries.zero? ? retries = 1 : raise
  recreate_queue(channel, queue)
  retry
end

Instance Method Details

#acknowledge(delivery_tag) ⇒ Object



58
59
60
# File 'lib/legion/transport/queue.rb', line 58

def acknowledge(delivery_tag)
  channel.acknowledge(delivery_tag)
end

#default_optionsObject



22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/legion/transport/queue.rb', line 22

def default_options
  hash = {}
  hash[:manual_ack] = true
  hash[:durable] = true
  hash[:exclusive] = false
  hash[:block] = false
  hash[:auto_delete] = false
  hash[:arguments] = {
    'x-max-priority': 255,
    'x-overflow': 'reject-publish',
    'x-dead-letter-exchange': "#{self.class.ancestors.first.to_s.split('::')[2].downcase}.dlx"
  }
  hash
end

#delete(options = { if_unused: true, if_empty: true }) ⇒ Object



51
52
53
54
55
56
# File 'lib/legion/transport/queue.rb', line 51

def delete(options = { if_unused: true, if_empty: true })
  super(options)
  true
rescue Legion::Transport::CONNECTOR::PreconditionFailed
  false
end

#queue_nameObject



41
42
43
44
45
46
47
48
49
# File 'lib/legion/transport/queue.rb', line 41

def queue_name
  ancestor = self.class.ancestors.first.to_s.split('::')
  name = if ancestor[5].scan(/[A-Z]/).length > 1
           ancestor[5].gsub!(/(.)([A-Z])/, '\1_\2').downcase!
         else
           ancestor[5].downcase!
         end
  "#{ancestor[2].downcase}.#{name}"
end

#queue_optionsObject



37
38
39
# File 'lib/legion/transport/queue.rb', line 37

def queue_options
  {}
end

#recreate_queue(channel, queue) ⇒ Object



16
17
18
19
20
# File 'lib/legion/transport/queue.rb', line 16

def recreate_queue(channel, queue)
  Legion::Logging.warn "Queue:#{queue} exists with wrong parameters, deleting and creating"
  queue = ::Bunny::Queue.new(channel, queue, no_declare: true, passive: true)
  queue.delete(if_empty: true)
end

#reject(delivery_tag, requeue: false) ⇒ Object



62
63
64
# File 'lib/legion/transport/queue.rb', line 62

def reject(delivery_tag, requeue: false)
  channel.reject(delivery_tag, requeue)
end