Module: Resque::Plugins::Filter::JobFilter
- Defined in:
- lib/resque/plugins/filter/job_filter.rb
Class Attribute Summary collapse
-
.strategy ⇒ Object
optional.
Class Method Summary collapse
-
.configure {|_self| ... } ⇒ Object
Allows configuring via class accessors.
- .extended(receiver) ⇒ Object
Instance Method Summary collapse
- #filter(job) ⇒ Object
-
#optimistic_reserve_with_filter(queue) ⇒ Object
if filtering on hostname, in a cluster of many workers, it could take a while for the job to get to the machine in question as unrelated workers would thrash on this job, thereby preventing it from reaching the right worker.
- #reserve_with_filter(queue) ⇒ Object
- #simple_reserve_with_filter(queue) ⇒ Object
Class Attribute Details
.strategy ⇒ Object
optional
21 22 23 |
# File 'lib/resque/plugins/filter/job_filter.rb', line 21 def strategy @strategy end |
Class Method Details
.configure {|_self| ... } ⇒ Object
Allows configuring via class accessors
33 34 35 |
# File 'lib/resque/plugins/filter/job_filter.rb', line 33 def self.configure yield self end |
.extended(receiver) ⇒ Object
37 38 39 40 41 42 |
# File 'lib/resque/plugins/filter/job_filter.rb', line 37 def self.extended(receiver) class << receiver alias reserve_without_filter reserve alias reserve reserve_with_filter end end |
Instance Method Details
#filter(job) ⇒ Object
95 96 97 98 99 100 101 |
# File 'lib/resque/plugins/filter/job_filter.rb', line 95 def filter(job) if job.payload_class.respond_to?(:filter) return job.payload_class.filter(*job.args) else return true end end |
#optimistic_reserve_with_filter(queue) ⇒ Object
if filtering on hostname, in a cluster of many workers, it could take a while for the job to get to the machine in question as unrelated workers would thrash on this job, thereby preventing it from reaching the right worker. By peeking at the data and only popping it if filtered, then the right worker would get the job quicker. However, if there are other jobs on the queue, it may still take a while. This is also not DistributedRedis client safe due to the use of watch/multi/exec
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/resque/plugins/filter/job_filter.rb', line 69 def optimistic_reserve_with_filter(queue) # http://redis.io/topics/transactions # WATCH mykey # val = GET mykey # val = val + 1 # MULTI # SET mykey $val # EXEC key = "queue:#{queue}" redis.watch(key) return unless payload = decode(redis.lindex(key, 0)) job = new(queue, payload) if filter(job) success = redis.multi do redis.lpop(key) end return job if success end return nil end |
#reserve_with_filter(queue) ⇒ Object
44 45 46 |
# File 'lib/resque/plugins/filter/job_filter.rb', line 44 def reserve_with_filter(queue) send("#{JobFilter.strategy}_reserve_with_filter", queue) end |
#simple_reserve_with_filter(queue) ⇒ Object
48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/resque/plugins/filter/job_filter.rb', line 48 def simple_reserve_with_filter(queue) return unless job = reserve_without_filter(queue) # if the class participates in filter, and doesn't want to be run, # then push it back onto queue if filter(job) return job else Resque.push(queue, job.payload) return nil end end |