Class: Legion::Transport::Queue
- Inherits:
-
CONNECTOR::Queue
- Object
- CONNECTOR::Queue
- Legion::Transport::Queue
show all
- Includes:
- Common
- Defined in:
- lib/legion/transport/queue.rb
Instance Method Summary
collapse
Methods included from Common
#channel, #channel_open?, #close, #close!, #deep_merge, #generate_consumer_tag, #open_channel, #options_builder
Constructor Details
#initialize(queue = queue_name, options = {}) ⇒ Queue
Returns a new instance of Queue.
6
7
8
9
10
11
12
13
14
|
# File 'lib/legion/transport/queue.rb', line 6
def initialize(queue = queue_name, options = {})
retries ||= 0
@options = options
super(channel, queue, options_builder(default_options, queue_options, options))
rescue Legion::Transport::CONNECTOR::PreconditionFailed
retries.zero? ? retries = 1 : raise
recreate_queue(channel, queue)
retry
end
|
Instance Method Details
#acknowledge(delivery_tag) ⇒ Object
58
59
60
|
# File 'lib/legion/transport/queue.rb', line 58
def acknowledge(delivery_tag)
channel.acknowledge(delivery_tag)
end
|
#default_options ⇒ Object
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
# File 'lib/legion/transport/queue.rb', line 22
def default_options
hash = {}
hash[:manual_ack] = true
hash[:durable] = true
hash[:exclusive] = false
hash[:block] = false
hash[:auto_delete] = false
hash[:arguments] = {
'x-max-priority': 255,
'x-overflow': 'reject-publish',
'x-dead-letter-exchange': "#{self.class.ancestors.first.to_s.split('::')[2].downcase}.dlx"
}
hash
end
|
#delete(options = { if_unused: true, if_empty: true }) ⇒ Object
51
52
53
54
55
56
|
# File 'lib/legion/transport/queue.rb', line 51
def delete(options = { if_unused: true, if_empty: true })
super(options)
true
rescue Legion::Transport::CONNECTOR::PreconditionFailed
false
end
|
#queue_name ⇒ Object
41
42
43
44
45
46
47
48
49
|
# File 'lib/legion/transport/queue.rb', line 41
def queue_name
ancestor = self.class.ancestors.first.to_s.split('::')
name = if ancestor[5].scan(/[A-Z]/).length > 1
ancestor[5].gsub!(/(.)([A-Z])/, '\1_\2').downcase!
else
ancestor[5].downcase!
end
"#{ancestor[2].downcase}.#{name}"
end
|
#queue_options ⇒ Object
37
38
39
|
# File 'lib/legion/transport/queue.rb', line 37
def queue_options
{}
end
|
#recreate_queue(channel, queue) ⇒ Object
16
17
18
19
20
|
# File 'lib/legion/transport/queue.rb', line 16
def recreate_queue(channel, queue)
Legion::Logging.warn "Queue:#{queue} exists with wrong parameters, deleting and creating"
queue = ::Bunny::Queue.new(channel, queue, no_declare: true, passive: true)
queue.delete(if_empty: true)
end
|
#reject(delivery_tag, requeue: false) ⇒ Object
62
63
64
|
# File 'lib/legion/transport/queue.rb', line 62
def reject(delivery_tag, requeue: false)
channel.reject(delivery_tag, requeue)
end
|