Module: Resque::Plugins::ConcurrentRestriction::Job
- Defined in:
- lib/resque/plugins/concurrent_restriction/resque_worker_extension.rb
Class Method Summary collapse
Instance Method Summary collapse
- #get_queued_job(queue) ⇒ Object
- #get_restricted_job(queue) ⇒ Object
-
#reserve_with_restriction(queue) ⇒ Object
Wrap reserve so we can move a job to restriction queue if it is restricted This needs to be a class method.
Class Method Details
.extended(receiver) ⇒ Object
40 41 42 43 44 45 |
# File 'lib/resque/plugins/concurrent_restriction/resque_worker_extension.rb', line 40 def self.extended(receiver) class << receiver alias reserve_without_restriction reserve alias reserve reserve_with_restriction end end |
Instance Method Details
#get_queued_job(queue) ⇒ Object
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/resque/plugins/concurrent_restriction/resque_worker_extension.rb', line 69 def get_queued_job(queue) # Bounded retry 1.upto(ConcurrentRestriction.reserve_queued_job_attempts) do |i| resque_job = reserve_without_restriction(queue) # Short-curcuit if a job was not found return if resque_job.nil? # If there is a job on regular queues, then only run it if its not restricted job_class = resque_job.payload_class job_args = resque_job.args # Return to work on job if not a restricted job return resque_job unless job_class.is_a?(ConcurrentRestriction) # Keep trying if job is restricted. If job is runnable, we keep the lock until # done_working return resque_job unless job_class.stash_if_restricted(resque_job) end # Safety net, here in case we hit the upper bound and there are still queued items return nil end |
#get_restricted_job(queue) ⇒ Object
62 63 64 65 66 67 |
# File 'lib/resque/plugins/concurrent_restriction/resque_worker_extension.rb', line 62 def get_restricted_job(queue) # Try to find a runnable job from restriction queues # This also acquires a restriction lock, which is released in done_working resque_job = ConcurrentRestrictionJob.next_runnable_job(queue) return resque_job end |
#reserve_with_restriction(queue) ⇒ Object
Wrap reserve so we can move a job to restriction queue if it is restricted This needs to be a class method
49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/resque/plugins/concurrent_restriction/resque_worker_extension.rb', line 49 def reserve_with_restriction(queue) order = [:get_queued_job, :get_restricted_job] order.reverse! if ConcurrentRestriction.restricted_before_queued resque_job = nil order.each do |m| resque_job ||= self.send(m, queue) end # Return job or nil to move on to next queue if we couldn't get a job return resque_job end |