Class: Resque::Plugins::Loner::Helpers
- Inherits:
-
Object
- Object
- Resque::Plugins::Loner::Helpers
- Extended by:
- Helpers
- Defined in:
- lib/resque-loner/helpers.rb
Class Method Summary collapse
- .cleanup_loners(queue) ⇒ Object
- .item_is_a_unique_job?(item) ⇒ Boolean
- .item_ttl(item) ⇒ Object
- .job_destroy(queue, klass, *args) ⇒ Object
- .loner_queued?(queue, item) ⇒ Boolean
- .mark_loner_as_queued(queue, item) ⇒ Object
- .mark_loner_as_unqueued(queue, job) ⇒ Object
- .unique_job_queue_key(queue, item) ⇒ Object
Class Method Details
.cleanup_loners(queue) ⇒ Object
65 66 67 68 |
# File 'lib/resque-loner/helpers.rb', line 65 def self.cleanup_loners(queue) keys = redis.keys("loners:queue:#{queue}:job:*") redis.del(*keys) unless keys.empty? end |
.item_is_a_unique_job?(item) ⇒ Boolean
32 33 34 35 36 37 38 39 |
# File 'lib/resque-loner/helpers.rb', line 32 def self.item_is_a_unique_job?(item) begin klass = constantize(item[:class] || item["class"]) klass.included_modules.include?(::Resque::Plugins::UniqueJob) rescue false # Resque testsuite also submits strings as job classes while Resque.enqueue'ing, end # so resque-loner should not start throwing up when that happens. end |
.item_ttl(item) ⇒ Object
41 42 43 44 45 46 47 |
# File 'lib/resque-loner/helpers.rb', line 41 def self.item_ttl(item) begin constantize(item[:class] || item["class"]).loner_ttl rescue -1 end end |
.job_destroy(queue, klass, *args) ⇒ Object
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/resque-loner/helpers.rb', line 49 def self.job_destroy(queue, klass, *args) klass = klass.to_s redis_queue = "queue:#{queue}" redis.lrange(redis_queue, 0, -1).each do |string| json = decode(string) match = json['class'] == klass match &= json['args'] == args unless args.empty? if match Resque::Plugins::Loner::Helpers.mark_loner_as_unqueued( queue, json ) end end end |
.loner_queued?(queue, item) ⇒ Boolean
7 8 9 10 |
# File 'lib/resque-loner/helpers.rb', line 7 def self.loner_queued?(queue, item) return false unless item_is_a_unique_job?(item) redis.get(unique_job_queue_key(queue, item)) == "1" end |
.mark_loner_as_queued(queue, item) ⇒ Object
12 13 14 15 16 17 18 19 |
# File 'lib/resque-loner/helpers.rb', line 12 def self.mark_loner_as_queued(queue, item) return unless item_is_a_unique_job?(item) key = unique_job_queue_key(queue, item) redis.set(key, 1) unless(ttl=item_ttl(item)) == -1 # no need to incur overhead for default value redis.expire(key, ttl) end end |
.mark_loner_as_unqueued(queue, job) ⇒ Object
21 22 23 24 25 |
# File 'lib/resque-loner/helpers.rb', line 21 def self.mark_loner_as_unqueued(queue, job) item = job.is_a?(Resque::Job) ? job.payload : job return unless item_is_a_unique_job?(item) redis.del(unique_job_queue_key(queue, item)) end |
.unique_job_queue_key(queue, item) ⇒ Object
27 28 29 30 |
# File 'lib/resque-loner/helpers.rb', line 27 def self.unique_job_queue_key(queue, item) job_key = constantize(item[:class] || item["class"]).redis_key_loner(item) "loners:queue:#{queue}:job:#{job_key}" end |