Class: Shoryuken::Polling::StrictPriority
Instance Method Summary
collapse
#==, #delay
Methods included from Util
#elapsed, #fire_event, #logger, #unparse_queues, #worker_name
Constructor Details
#initialize(queues, delay = nil) ⇒ StrictPriority
Returns a new instance of StrictPriority.
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
# File 'lib/shoryuken/polling/strict_priority.rb', line 4
def initialize(queues, delay = nil)
@queues = queues
.group_by { |q| q }
.sort_by { |_, qs| -qs.count }
.map(&:first)
@paused_until = queues
.each_with_object({}) { |queue, h| h[queue] = Time.at(0) }
@delay = delay
reset_next_queue
end
|
Instance Method Details
#active_queues ⇒ Object
33
34
35
36
37
38
39
|
# File 'lib/shoryuken/polling/strict_priority.rb', line 33
def active_queues
@queues
.reverse
.map.with_index(1)
.reject { |q, _| queue_paused?(q) }
.reverse
end
|
#message_processed(queue) ⇒ Object
41
42
43
44
45
46
|
# File 'lib/shoryuken/polling/strict_priority.rb', line 41
def message_processed(queue)
if queue_paused?(queue)
logger.debug "Unpausing #{queue}"
@paused_until[queue] = Time.at 0
end
end
|
#messages_found(queue, messages_found) ⇒ Object
25
26
27
28
29
30
31
|
# File 'lib/shoryuken/polling/strict_priority.rb', line 25
def messages_found(queue, messages_found)
if messages_found == 0
pause(queue)
else
reset_next_queue
end
end
|
#next_queue ⇒ Object
20
21
22
23
|
# File 'lib/shoryuken/polling/strict_priority.rb', line 20
def next_queue
next_queue = next_active_queue
next_queue.nil? ? nil : QueueConfiguration.new(next_queue, {})
end
|