Module: Resque::DynamicQueues
- Defined in:
- lib/resque/dynamic_queues.rb
Constant Summary collapse
- DEFAULT_QUEUE_DEPTH =
0
Class Method Summary collapse
Instance Method Summary collapse
- #filter_busy_queues(qs) ⇒ Object
- #queue_depth(queuename) ⇒ Object
-
#queues_with_dynamic ⇒ Object
Returns a list of queues to use when searching for a job.
- #reserve_with_round_robin ⇒ Object
- #rotated_queues ⇒ Object
- #should_work_on_queue?(queuename) ⇒ Boolean
Class Method Details
.included(receiver) ⇒ Object
133 134 135 136 137 138 139 140 |
# File 'lib/resque/dynamic_queues.rb', line 133 def self.included(receiver) receiver.class_eval do alias queues_without_dynamic queues alias queues queues_with_dynamic alias reserve_without_round_robin reserve alias reserve reserve_with_round_robin end end |
Instance Method Details
#filter_busy_queues(qs) ⇒ Object
3 4 5 6 |
# File 'lib/resque/dynamic_queues.rb', line 3 def filter_busy_queues qs busy_queues = Resque::Worker.working.map { |worker| worker.job["queue"] }.compact Array(qs.dup).compact - busy_queues end |
#queue_depth(queuename) ⇒ Object
20 21 22 23 24 |
# File 'lib/resque/dynamic_queues.rb', line 20 def queue_depth queuename busy_queues = Resque::Worker.working.map { |worker| worker.job["queue"] }.compact # find the queuename, count it. busy_queues.select {|q| q == queuename }.size end |
#queues_with_dynamic ⇒ Object
Returns a list of queues to use when searching for a job.
A splat (“*”) means you want every queue (in alpha order) - this can be useful for dynamically adding new queues.
The splat can also be used as a wildcard within a queue name, e.g. “high”, and negation can be indicated with a prefix of “!”
An @key can be used to dynamically look up the queue list for key from redis. If no key is supplied, it defaults to the worker’s hostname, and wildcards and negations can be used inside this dynamic queue list. Set the queue list for a key with Resque.set_dynamic_queue(key, [“q1”, “q2”]
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/resque/dynamic_queues.rb', line 83 def queues_with_dynamic queue_names = @queues.dup return queues_without_dynamic if queue_names.grep(/(^!)|(^@)|(\*)/).size == 0 real_queues = Resque.queues matched_queues = [] #Remove Queues under Api Limits api_limit_instances = Redis.current.keys('APILimits:*').map {|key| key.split('APILimits:').last.to_i} real_queues = real_queues.select {|key| key if !api_limit_instances.include?((key.match(/^(\d*)_.*/) || [])[1].to_i)} ## 2 #Queue Pausing paused_instances = Redis.current.keys('resque:PauseQueue:*').map {|key| key.split('resque:PauseQueue:').last.to_i} real_queues = real_queues.select {|key| key if !paused_instances.include?((key.match(/^(\d*)_.*/) || [])[1].to_i)} while q = queue_names.shift q = q.to_s if q =~ /^(!)?@(.*)/ key = $2.strip key = hostname if key.size == 0 add_queues = Resque.get_dynamic_queue(key) add_queues.map! { |q| q.gsub!(/^!/, '') || q.gsub!(/^/, '!') } if $1 queue_names.concat(add_queues) next end if q =~ /^!/ negated = true q = q[1..-1] end patstr = q.gsub(/\*/, '.*') pattern = /^#{patstr}$/ if negated matched_queues -= matched_queues.grep(pattern) else matches = real_queues.grep(/^#{pattern}$/) matches = [q] if matches.size == 0 && q == patstr matched_queues.concat(matches.sort) end end return matched_queues.uniq end |
#reserve_with_round_robin ⇒ Object
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/resque/dynamic_queues.rb', line 40 def reserve_with_round_robin grouped_queues = queues.sort.group_by{|u| /(\d{1,20})_.*/.match(u) ? /(\d{1,20})_.*/.match(u).captures.first : nil} #Instance queue grouping if !grouped_queues.keys.include?(nil) && grouped_queues.keys.size > 0 @n ||= 0 @n += 1 @n = @n % grouped_queues.keys.size grouped_queues.keys.rotate(@n).each do |key| grouped_queues[key].each do |queue| log! "Checking #{queue}" if should_work_on_queue?(queue) && @job_in_progress = Resque::Job.reserve(queue) log! "Found job on #{queue}" return @job_in_progress end end @n += 1 # Start the next search at the queue after the one from which we pick a job. end nil else return reserve_without_round_robin end rescue Exception => e log "Error reserving job: #{e.inspect}" log e.backtrace.join("\n") raise e end |
#rotated_queues ⇒ Object
8 9 10 11 12 13 14 15 16 17 18 |
# File 'lib/resque/dynamic_queues.rb', line 8 def rotated_queues @n ||= 0 @n += 1 rot_queues = queues # since we rely on the resque-dynamic-queues plugin, this is all the queues, expanded out if rot_queues.size > 0 @n = @n % rot_queues.size rot_queues.rotate(@n) else rot_queues end end |
#should_work_on_queue?(queuename) ⇒ Boolean
27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/resque/dynamic_queues.rb', line 27 def should_work_on_queue? queuename return true if @queues.include? '*' # workers with QUEUES=* are special and are not subject to queue depth setting max = DEFAULT_QUEUE_DEPTH unless ENV["RESQUE_QUEUE_DEPTH"].nil? || ENV["RESQUE_QUEUE_DEPTH"] == "" max = ENV["RESQUE_QUEUE_DEPTH"].to_i end return true if max == 0 # 0 means no limiting cur_depth = queue_depth(queuename) log! "queue #{queuename} depth = #{cur_depth} max = #{max}" return true if cur_depth < max false end |