Module: Resque::DynamicQueues
- Defined in:
- lib/resque/dynamic_queues.rb
Constant Summary collapse
- DEFAULT_QUEUE_DEPTH =
0
Class Method Summary collapse
Instance Method Summary collapse
- #create_job(queue, payload) ⇒ Object
- #filter_busy_queues(qs) ⇒ Object
- #get_categorized_queues(queue_list) ⇒ Object
- #get_grouped_queues ⇒ Object
- #get_job_from_queues(grouped_queues) ⇒ Object
- #get_next_job(grouped_queues) ⇒ Object
- #get_queued_job(grouped_queues) ⇒ Object
- #get_restricted_job ⇒ Object
- #queue_depth(queuename) ⇒ Object
-
#queues_with_dynamic ⇒ Object
Returns a list of queues to use when searching for a job.
- #reserve_with_round_robin ⇒ Object
- #rotated_queues ⇒ Object
- #should_work_on_queue?(queuename) ⇒ Boolean
Class Method Details
.included(receiver) ⇒ Object
235 236 237 238 239 240 241 242 |
# File 'lib/resque/dynamic_queues.rb', line 235 def self.included(receiver) receiver.class_eval do alias queues_without_dynamic queues alias queues queues_with_dynamic alias reserve_without_round_robin reserve alias reserve reserve_with_round_robin end end |
Instance Method Details
#create_job(queue, payload) ⇒ Object
98 99 100 101 |
# File 'lib/resque/dynamic_queues.rb', line 98 def create_job(queue, payload) return unless payload Resque::Job.new(queue, payload) end |
#filter_busy_queues(qs) ⇒ Object
3 4 5 6 |
# File 'lib/resque/dynamic_queues.rb', line 3 def filter_busy_queues qs busy_queues = Resque::Worker.working.map { |worker| worker.job["queue"] }.compact Array(qs.dup).compact - busy_queues end |
#get_categorized_queues(queue_list) ⇒ Object
26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/resque/dynamic_queues.rb', line 26 def get_categorized_queues(queue_list) priority_map = {"Synchronous" => 0, "High" => 1, "Medium" => 2, "Low" => 3} categorized_queues = {} for queue in queue_list.uniq priority = queue.split("_")[1] priority = "Medium" if !["Synchronous", "High", "Medium", "Low"].include?(priority) categorized_queues[priority] ||= [] categorized_queues[priority].push(queue) end return categorized_queues.transform_keys{ |key| priority_map[key.to_s]}.sort end |
#get_grouped_queues ⇒ Object
52 53 54 |
# File 'lib/resque/dynamic_queues.rb', line 52 def get_grouped_queues self.queues.sort.group_by{|u| /(\d{1,20})_.*/.match(u) ? /(\d{1,20})_.*/.match(u).captures.first : nil} end |
#get_job_from_queues(grouped_queues) ⇒ Object
134 135 136 |
# File 'lib/resque/dynamic_queues.rb', line 134 def get_job_from_queues(grouped_queues) Resque.redis.blpop(grouped_queues, :timeout => (ENV["BLPOP_TIMEOUT"].to_i || 30)) end |
#get_next_job(grouped_queues) ⇒ Object
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/resque/dynamic_queues.rb', line 103 def get_next_job(grouped_queues) @n ||= 0 low_priority_queues = [] high_priority_queues = [] # Categorize queues as low_priority if its appinstance id is lower than current grouped_queues.sort_by { |app_instance_id, _| app_instance_id.to_i }.each do |app_instance_queues| app_instance_id, queues = app_instance_queues queues = get_categorized_queues(queues).to_h.values.flatten app_instance_id.to_i <= @n ? low_priority_queues.push(queues) : high_priority_queues.push(queues) end # Reorder queue groups by putting high priority queues in the front grouped_queues = (high_priority_queues + low_priority_queues). flatten. delete_if { |queue| !should_work_on_queue?(queue) }. map { |queue| "queue:#{queue}" } queue, payload = get_job_from_queues(grouped_queues) return nil if queue.blank? queue = queue.split("queue:")[1] # track last app instance id @n = queue.split('_').first.to_i return create_job(queue, Resque.decode(payload)) end |
#get_queued_job(grouped_queues) ⇒ Object
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
# File 'lib/resque/dynamic_queues.rb', line 142 def get_queued_job(grouped_queues) if defined?(Resque::Plugins::ConcurrentRestriction) # Bounded retry 1.upto(Resque::Plugins::ConcurrentRestriction.reserve_queued_job_attempts) do |i| resque_job = get_next_job(grouped_queues) # Short-curcuit if a job was not found return if resque_job.nil? # If there is a job on regular queues, then only run it if its not restricted job_class = resque_job.payload_class job_args = resque_job.args # Return to work on job if not a restricted job return resque_job unless job_class.is_a?(Resque::Plugins::ConcurrentRestriction) # Keep trying if job is restricted. If job is runnable, we keep the lock until # done_working return resque_job unless job_class.stash_if_restricted(resque_job) end # Safety net, here in case we hit the upper bound and there are still queued items return nil else return get_next_job(grouped_queues) end end |
#get_restricted_job ⇒ Object
138 139 140 |
# File 'lib/resque/dynamic_queues.rb', line 138 def get_restricted_job Resque::Plugins::ConcurrentRestrictionJob.next_runnable_job_random end |
#queue_depth(queuename) ⇒ Object
20 21 22 23 24 |
# File 'lib/resque/dynamic_queues.rb', line 20 def queue_depth queuename busy_queues = Resque::Worker.working.map { |worker| worker.job["queue"] }.compact # find the queuename, count it. busy_queues.select {|q| q == queuename }.size end |
#queues_with_dynamic ⇒ Object
Returns a list of queues to use when searching for a job.
A splat (“*”) means you want every queue (in alpha order) - this can be useful for dynamically adding new queues.
The splat can also be used as a wildcard within a queue name, e.g. “high”, and negation can be indicated with a prefix of “!”
An @key can be used to dynamically look up the queue list for key from redis. If no key is supplied, it defaults to the worker’s hostname, and wildcards and negations can be used inside this dynamic queue list. Set the queue list for a key with Resque.set_dynamic_queue(key, [“q1”, “q2”]
183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 |
# File 'lib/resque/dynamic_queues.rb', line 183 def queues_with_dynamic queue_names = @queues.dup return queues_without_dynamic if queue_names.grep(/(^!)|(^@)|(\*)/).size == 0 real_queues = Resque.queues matched_queues = [] #Remove Queues under Api Limits Redis.current.zremrangebyscore("APILimits", "0", "(#{Time.now.to_i}") api_limit_instances = Redis.current.zrange("APILimits", 0, -1).map {|key| key.to_i if key.match(/^\d*$/)}.compact real_queues = real_queues.select {|key| key if !api_limit_instances.include?((key.match(/^(\d*)_.*/) || [])[1].to_i)} ## 2 #Queue Pausing Resque.redis.zremrangebyscore("PauseQueue", "0", "(#{Time.now.to_i}") paused_instances = Resque.redis.zrange("PauseQueue", 0, -1).map {|key| key.split("__")[0].to_i if key.match(/^\d*__.*/)}.compact real_queues = real_queues.select {|key| key if !paused_instances.include?((key.match(/^(\d*)_.*/) || [])[1].to_i)} while q = queue_names.shift q = q.to_s if q =~ /^(!)?@(.*)/ key = $2.strip key = hostname if key.size == 0 add_queues = Resque.get_dynamic_queue(key) add_queues.map! { |q| q.gsub!(/^!/, '') || q.gsub!(/^/, '!') } if $1 queue_names.concat(add_queues) next end if q =~ /^!/ negated = true q = q[1..-1] end patstr = q.gsub(/\*/, '.*') pattern = /^#{patstr}$/ if negated matched_queues -= matched_queues.grep(pattern) else matches = real_queues.grep(/^#{pattern}$/) matches = [q] if matches.size == 0 && q == patstr matched_queues.concat(matches.sort) end end return matched_queues.uniq end |
#reserve_with_round_robin ⇒ Object
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/resque/dynamic_queues.rb', line 56 def reserve_with_round_robin grouped_queues = self.get_grouped_queues #Instance queue grouping if !grouped_queues.keys.include?(nil) && grouped_queues.keys.size > 0 if ZuoraConnect.configuration.blpop_queue @job_in_progress = get_restricted_job return @job_in_progress if @job_in_progress.present? return @job_in_progress = get_queued_job(grouped_queues) else @n ||= 0 @n += 1 @n = @n % grouped_queues.keys.size grouped_queues.keys.rotate(@n).each do |key| self.get_categorized_queues(grouped_queues[key]).each do |key, queues| queues.each do |queue| log! "Checking #{queue}" if should_work_on_queue?(queue) && @job_in_progress = Resque::Job.reserve(queue) log! "Found job on #{queue}" return @job_in_progress end end end @n += 1 # Start the next search at the queue after the one from which we pick a job. end nil end else return reserve_without_round_robin end rescue Exception => ex if defined?(Ougai::Logger) && Resque.logger.is_a?(Ougai::Logger) log_with_severity :error, "Error reserving job", ex else log_with_severity :error, "Error reserving job: #{ex.inspect}" log_with_severity :error, ex.backtrace.join("\n") end raise ex end |
#rotated_queues ⇒ Object
8 9 10 11 12 13 14 15 16 17 18 |
# File 'lib/resque/dynamic_queues.rb', line 8 def rotated_queues @n ||= 0 @n += 1 rot_queues = queues # since we rely on the resque-dynamic-queues plugin, this is all the queues, expanded out if rot_queues.size > 0 @n = @n % rot_queues.size rot_queues.rotate(@n) else rot_queues end end |
#should_work_on_queue?(queuename) ⇒ Boolean
39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/resque/dynamic_queues.rb', line 39 def should_work_on_queue? queuename return true if @queues.include? '*' # workers with QUEUES=* are special and are not subject to queue depth setting max = DEFAULT_QUEUE_DEPTH unless ENV["RESQUE_QUEUE_DEPTH"].nil? || ENV["RESQUE_QUEUE_DEPTH"] == "" max = ENV["RESQUE_QUEUE_DEPTH"].to_i end return true if max == 0 # 0 means no limiting cur_depth = queue_depth(queuename) log! "queue #{queuename} depth = #{cur_depth} max = #{max}" return true if cur_depth < max false end |