Class: Shoryuken::Queue
- Inherits:
-
Object
show all
- Includes:
- Util
- Defined in:
- lib/shoryuken/queue.rb
Constant Summary
collapse
- FIFO_ATTR =
'FifoQueue'.freeze
- MESSAGE_GROUP_ID =
'ShoryukenMessage'.freeze
- VISIBILITY_TIMEOUT_ATTR =
'VisibilityTimeout'.freeze
Instance Attribute Summary collapse
Instance Method Summary
collapse
Methods included from Util
#elapsed, #fire_event, #logger, #unparse_queues, #worker_name
Constructor Details
#initialize(client, name_or_url_or_arn) ⇒ Queue
Returns a new instance of Queue.
11
12
13
14
|
# File 'lib/shoryuken/queue.rb', line 11
def initialize(client, name_or_url_or_arn)
self.client = client
set_name_and_url(name_or_url_or_arn)
end
|
Instance Attribute Details
#client ⇒ Object
Returns the value of attribute client.
9
10
11
|
# File 'lib/shoryuken/queue.rb', line 9
def client
@client
end
|
#name ⇒ Object
Returns the value of attribute name.
9
10
11
|
# File 'lib/shoryuken/queue.rb', line 9
def name
@name
end
|
#url ⇒ Object
Returns the value of attribute url.
9
10
11
|
# File 'lib/shoryuken/queue.rb', line 9
def url
@url
end
|
Instance Method Details
#delete_messages(options) ⇒ Object
23
24
25
26
27
28
29
30
31
|
# File 'lib/shoryuken/queue.rb', line 23
def delete_messages(options)
client.delete_message_batch(
options.merge(queue_url: url)
).failed.any? do |failure|
logger.error do
"Could not delete #{failure.id}, code: '#{failure.code}', message: '#{failure.message}', sender_fault: #{failure.sender_fault}"
end
end
end
|
#fifo? ⇒ Boolean
49
50
51
52
53
54
55
56
|
# File 'lib/shoryuken/queue.rb', line 49
def fifo?
return @_fifo if defined?(@_fifo)
@_fifo = queue_attributes.attributes[FIFO_ATTR] == 'true'
@_fifo
end
|
#receive_messages(options) ⇒ Object
45
46
47
|
# File 'lib/shoryuken/queue.rb', line 45
def receive_messages(options)
client.receive_message(options.merge(queue_url: url)).messages.map { |m| Message.new(client, self, m) }
end
|
#send_message(options) ⇒ Object
33
34
35
36
37
38
39
|
# File 'lib/shoryuken/queue.rb', line 33
def send_message(options)
options = sanitize_message!(options).merge(queue_url: url)
Shoryuken.client_middleware.invoke(options) do
client.send_message(options)
end
end
|
#send_messages(options) ⇒ Object
41
42
43
|
# File 'lib/shoryuken/queue.rb', line 41
def send_messages(options)
client.send_message_batch(sanitize_messages!(options).merge(queue_url: url))
end
|
#visibility_timeout ⇒ Object
16
17
18
19
20
21
|
# File 'lib/shoryuken/queue.rb', line 16
def visibility_timeout
@_visibility_timeout = nil unless Shoryuken.cache_visibility_timeout?
@_visibility_timeout ||= queue_attributes.attributes[VISIBILITY_TIMEOUT_ATTR].to_i
end
|