Module: Resque::Plugins::Filter::JobFilter

Defined in:
lib/resque/plugins/filter/job_filter.rb

Class Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Class Attribute Details

.strategyObject

optional



21
22
23
# File 'lib/resque/plugins/filter/job_filter.rb', line 21

def strategy
  @strategy
end

Class Method Details

.configure {|_self| ... } ⇒ Object

Allows configuring via class accessors

Yields:

  • (_self)

Yield Parameters:



33
34
35
# File 'lib/resque/plugins/filter/job_filter.rb', line 33

def self.configure
  yield self
end

.extended(receiver) ⇒ Object



37
38
39
40
41
42
# File 'lib/resque/plugins/filter/job_filter.rb', line 37

def self.extended(receiver)
   class << receiver
     alias reserve_without_filter reserve
     alias reserve reserve_with_filter
   end
end

Instance Method Details

#filter(job) ⇒ Object



95
96
97
98
99
100
101
# File 'lib/resque/plugins/filter/job_filter.rb', line 95

def filter(job)
  if job.payload_class.respond_to?(:filter)
    return job.payload_class.filter(*job.args)
  else
    return true
  end
end

#optimistic_reserve_with_filter(queue) ⇒ Object

if filtering on hostname, in a cluster of many workers, it could take a while for the job to get to the machine in question as unrelated workers would thrash on this job, thereby preventing it from reaching the right worker. By peeking at the data and only popping it if filtered, then the right worker would get the job quicker. However, if there are other jobs on the queue, it may still take a while. This is also not DistributedRedis client safe due to the use of watch/multi/exec



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/resque/plugins/filter/job_filter.rb', line 69

def optimistic_reserve_with_filter(queue)
  # http://redis.io/topics/transactions
  # WATCH mykey
  # val = GET mykey
  # val = val + 1
  # MULTI
  # SET mykey $val
  # EXEC
  
  key = "queue:#{queue}"
  redis.watch(key)
    
  return unless payload = decode(redis.lindex(key, 0))
  
  job = new(queue, payload)
  
  if filter(job)
    success = redis.multi do
      redis.lpop(key)
    end
    return job if success
  end
  
  return nil
end

#reserve_with_filter(queue) ⇒ Object



44
45
46
# File 'lib/resque/plugins/filter/job_filter.rb', line 44

def reserve_with_filter(queue)
  send("#{JobFilter.strategy}_reserve_with_filter", queue)
end

#simple_reserve_with_filter(queue) ⇒ Object



48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/resque/plugins/filter/job_filter.rb', line 48

def simple_reserve_with_filter(queue)
  return unless job = reserve_without_filter(queue)
  
  # if the class participates in filter, and doesn't want to be run,
  # then push it back onto queue
  if filter(job)
    return job
  else
    Resque.push(queue, job.payload)
    return nil
  end
end