Class: Delayed::Backend::Redis::Job
- Inherits:
-
Object
- Object
- Delayed::Backend::Redis::Job
show all
- Extended by:
- ActiveModel::Callbacks
- Includes:
- ActiveModel::Dirty, Base
- Defined in:
- lib/delayed/backend/redis/job.rb
Direct Known Subclasses
Failed
Defined Under Namespace
Modules: Keys
Classes: Failed
Constant Summary
collapse
- WAITING_STRAND_JOB_PRIORITY =
2000000
- COLUMNS =
[]
- TIMESTAMP_COLUMNS =
We store time attributes in redis as floats so we don’t have to do timestamp parsing in lua.
[]
- INTEGER_COLUMNS =
[]
Constants included
from Base
Base::ON_HOLD_COUNT, Base::ON_HOLD_LOCKED_BY
Instance Attribute Summary collapse
Class Method Summary
collapse
-
.bulk_update(action, opts) ⇒ Object
perform a bulk update of a set of jobs action is :hold, :unhold, or :destroy to specify the jobs to act on, either pass opts = [list of job ids] or opts = <some flavor> to perform on all jobs of that flavor.
-
.clear_locks!(worker_name) ⇒ Object
-
.column(name, type) ⇒ Object
-
.create(attrs = {}) ⇒ Object
-
.create!(attrs = {}) ⇒ Object
-
.create_singleton(options) ⇒ Object
-
.find(ids) ⇒ Object
-
.find_available(limit, queue = Delayed::Settings.queue, min_priority = Delayed::MIN_PRIORITY, max_priority = Delayed::MAX_PRIORITY) ⇒ Object
-
.find_one(id, options) ⇒ Object
-
.find_some(ids, options) ⇒ Object
-
.functions ⇒ Object
-
.get_and_lock_next_available(worker_name, queue = Delayed::Settings.queue, min_priority = Delayed::MIN_PRIORITY, max_priority = Delayed::MAX_PRIORITY, prefetch: nil, prefetch_owner: nil, forced_latency: nil) ⇒ Object
-
.get_with_ids(ids) ⇒ Object
-
.instantiate(attrs) ⇒ Object
-
.jobs_count(flavor, queue = Delayed::Settings.queue) ⇒ Object
get the total job count for the given flavor flavor is :current, :future or :failed for the :failed flavor, queue is currently ignored.
-
.key_for_job_id(job_id) ⇒ Object
-
.list_jobs(flavor, limit, offset = 0, query = nil) ⇒ Object
get a list of jobs of the given flavor in the given queue flavor is :current, :future, :failed, :strand or :tag depending on the flavor, query has a different meaning: for :current and :future, it’s the queue name (defaults to Delayed::Settings.queue) for :strand it’s the strand name for :tag it’s the tag name for :failed it’s ignored.
-
.reconnect! ⇒ Object
-
.running_jobs ⇒ Object
-
.strand_size(strand) ⇒ Object
-
.tag_counts(flavor, limit, offset = 0) ⇒ Object
returns a list of hashes { :tag => tag_name, :count => current_count } in descending count order flavor is :current or :all.
-
.unlock(jobs) ⇒ Object
Instance Method Summary
collapse
Methods included from Base
#batch?, #expired?, #failed?, #full_name, #hold!, included, #initialize_defaults, #invoke_job, #locked?, #name, #on_hold?, #payload_object, #payload_object=, #permanent_failure, #reschedule, #reschedule_at, #unhold!, #unlock
Constructor Details
#initialize(attrs = {}) ⇒ Job
Returns a new instance of Job.
135
136
137
138
139
140
|
# File 'lib/delayed/backend/redis/job.rb', line 135
def initialize(attrs = {})
attrs.each { |k, v| self.send("#{k}=", v) }
self.priority ||= 0
self.attempts ||= 0
@new_record = true
end
|
Instance Attribute Details
#on_conflict ⇒ Object
not saved, just used as a marker when creating
366
367
368
|
# File 'lib/delayed/backend/redis/job.rb', line 366
def on_conflict
@on_conflict
end
|
#singleton ⇒ Object
not saved, just used as a marker when creating
366
367
368
|
# File 'lib/delayed/backend/redis/job.rb', line 366
def singleton
@singleton
end
|
Class Method Details
.bulk_update(action, opts) ⇒ Object
perform a bulk update of a set of jobs action is :hold, :unhold, or :destroy to specify the jobs to act on, either pass opts = [list of job ids] or opts = <some flavor> to perform on all jobs of that flavor
see the list_jobs action for the list of available flavors and the meaning of opts for each
349
350
351
352
353
354
|
# File 'lib/delayed/backend/redis/job.rb', line 349
def self.bulk_update(action, opts)
if %w(current future).include?(opts[:flavor].to_s)
opts[:query] ||= Delayed::Settings.queue
end
functions.bulk_update(action, opts[:ids], opts[:flavor], opts[:query], db_time_now)
end
|
.clear_locks!(worker_name) ⇒ Object
323
324
325
326
327
328
329
|
# File 'lib/delayed/backend/redis/job.rb', line 323
def self.clear_locks!(worker_name)
self.running_jobs.each do |job|
job.unlock! if job.locked_by == worker_name
end
nil
end
|
.column(name, type) ⇒ Object
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
|
# File 'lib/delayed/backend/redis/job.rb', line 97
def self.column(name, type)
COLUMNS << name
if type == :timestamp
TIMESTAMP_COLUMNS << name
elsif type == :integer
INTEGER_COLUMNS << name
end
attr_reader(name)
define_attribute_methods([name])
class_eval(<<-EOS, __FILE__, __LINE__ + 1)
def #{name}=(new_value)
#{name}_will_change! unless new_value == self.#{name}
@#{name} = new_value
end
EOS
end
|
.create(attrs = {}) ⇒ Object
148
149
150
151
152
|
# File 'lib/delayed/backend/redis/job.rb', line 148
def self.create(attrs = {})
result = new(attrs)
result.save
result
end
|
.create!(attrs = {}) ⇒ Object
154
155
156
157
158
|
# File 'lib/delayed/backend/redis/job.rb', line 154
def self.create!(attrs = {})
result = new(attrs)
result.save!
result
end
|
.create_singleton(options) ⇒ Object
356
357
358
|
# File 'lib/delayed/backend/redis/job.rb', line 356
def self.create_singleton(options)
self.create!(options.merge(:singleton => true))
end
|
.find(ids) ⇒ Object
168
169
170
171
172
173
174
|
# File 'lib/delayed/backend/redis/job.rb', line 168
def self.find(ids)
if Array === ids
find_some(ids, {})
else
find_one(ids, {})
end
end
|
.find_available(limit, queue = Delayed::Settings.queue, min_priority = Delayed::MIN_PRIORITY, max_priority = Delayed::MAX_PRIORITY) ⇒ Object
252
253
254
255
256
257
258
259
260
261
|
# File 'lib/delayed/backend/redis/job.rb', line 252
def self.find_available(limit,
queue = Delayed::Settings.queue,
min_priority = Delayed::MIN_PRIORITY,
max_priority = Delayed::MAX_PRIORITY)
check_queue(queue)
check_priorities(min_priority, max_priority)
self.find(functions.find_available(queue, limit, 0, min_priority, max_priority, db_time_now))
end
|
.find_one(id, options) ⇒ Object
204
205
206
207
|
# File 'lib/delayed/backend/redis/job.rb', line 204
def self.find_one(id, options)
job = self.get_with_ids([id]).first
job || raise(ActiveRecord::RecordNotFound, "Couldn't find Job with ID=#{id}")
end
|
.find_some(ids, options) ⇒ Object
209
210
211
|
# File 'lib/delayed/backend/redis/job.rb', line 209
def self.find_some(ids, options)
self.get_with_ids(ids).compact
end
|
.get_and_lock_next_available(worker_name, queue = Delayed::Settings.queue, min_priority = Delayed::MIN_PRIORITY, max_priority = Delayed::MAX_PRIORITY, prefetch: nil, prefetch_owner: nil, forced_latency: nil) ⇒ Object
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
|
# File 'lib/delayed/backend/redis/job.rb', line 221
def self.get_and_lock_next_available(worker_name,
queue = Delayed::Settings.queue,
min_priority = Delayed::MIN_PRIORITY,
max_priority = Delayed::MAX_PRIORITY,
prefetch: nil,
prefetch_owner: nil,
forced_latency: nil)
check_queue(queue)
check_priorities(min_priority, max_priority)
if worker_name.is_a?(Array)
multiple_workers = true
worker_name = worker_name.first
end
now = db_time_now
now -= forced_latency if forced_latency
job_attrs = functions.get_and_lock_next_available(worker_name, queue, min_priority, max_priority, now)
job = instantiate_from_attrs(job_attrs) if multiple_workers
if job.nil?
job = {}
else
job = { worker_name => job }
end
end
job
end
|
.get_with_ids(ids) ⇒ Object
213
214
215
|
# File 'lib/delayed/backend/redis/job.rb', line 213
def self.get_with_ids(ids)
ids.map { |id| self.instantiate_from_attrs(redis.hgetall(key_for_job_id(id))) }
end
|
.instantiate(attrs) ⇒ Object
142
143
144
145
146
|
# File 'lib/delayed/backend/redis/job.rb', line 142
def self.instantiate(attrs)
result = new(attrs)
result.instance_variable_set(:@new_record, false)
result
end
|
.jobs_count(flavor, queue = Delayed::Settings.queue) ⇒ Object
get the total job count for the given flavor flavor is :current, :future or :failed for the :failed flavor, queue is currently ignored
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
|
# File 'lib/delayed/backend/redis/job.rb', line 299
def self.jobs_count(flavor,
queue = Delayed::Settings.queue)
case flavor.to_s
when 'current'
check_queue(queue)
redis.zcard(Keys::QUEUE[queue])
when 'future'
check_queue(queue)
redis.zcard(Keys::FUTURE_QUEUE[queue])
when 'failed'
redis.zcard(Keys::FAILED_JOBS)
else
raise ArgumentError, "invalid flavor: #{flavor.inspect}"
end
end
|
.key_for_job_id(job_id) ⇒ Object
217
218
219
|
# File 'lib/delayed/backend/redis/job.rb', line 217
def self.key_for_job_id(job_id)
Keys::JOB[job_id]
end
|
.list_jobs(flavor, limit, offset = 0, query = nil) ⇒ Object
get a list of jobs of the given flavor in the given queue flavor is :current, :future, :failed, :strand or :tag depending on the flavor, query has a different meaning: for :current and :future, it’s the queue name (defaults to Delayed::Settings.queue) for :strand it’s the strand name for :tag it’s the tag name for :failed it’s ignored
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
|
# File 'lib/delayed/backend/redis/job.rb', line 270
def self.list_jobs(flavor,
limit,
offset = 0,
query = nil)
case flavor.to_s
when 'current'
query ||= Delayed::Settings.queue
check_queue(query)
self.find(functions.find_available(query, limit, offset, 0, "+inf", db_time_now))
when 'future'
query ||= Delayed::Settings.queue
check_queue(query)
self.find(redis.zrangebyscore(Keys::FUTURE_QUEUE[query], 0, "+inf", :limit => [offset, limit]))
when 'failed'
Failed.find(redis.zrevrangebyscore(Keys::FAILED_JOBS, "+inf", 0, :limit => [offset, limit]))
when 'strand'
self.find(redis.lrange(Keys::STRAND[query], offset, offset + limit - 1))
when 'tag'
ids = redis.smembers(Keys::TAG[query])
self.find(ids[offset, limit])
else
raise ArgumentError, "invalid flavor: #{flavor.inspect}"
end
end
|
.reconnect! ⇒ Object
192
193
194
195
196
197
198
|
# File 'lib/delayed/backend/redis/job.rb', line 192
def self.reconnect!
redis.respond_to?(:reconnect) ?
redis.reconnect :
redis.client.reconnect
end
|
.running_jobs ⇒ Object
319
320
321
|
# File 'lib/delayed/backend/redis/job.rb', line 319
def self.running_jobs()
self.find(redis.zrangebyscore(Keys::RUNNING_JOBS, 0, "+inf"))
end
|
.strand_size(strand) ⇒ Object
315
316
317
|
# File 'lib/delayed/backend/redis/job.rb', line 315
def self.strand_size(strand)
redis.llen(Keys::STRAND[strand])
end
|
.tag_counts(flavor, limit, offset = 0) ⇒ Object
returns a list of hashes { :tag => tag_name, :count => current_count } in descending count order flavor is :current or :all
334
335
336
337
338
339
340
|
# File 'lib/delayed/backend/redis/job.rb', line 334
def self.tag_counts(flavor,
limit,
offset = 0)
raise(ArgumentError, "invalid flavor: #{flavor.inspect}") unless %w(current all).include?(flavor.to_s)
key = Keys::TAG_COUNTS[flavor]
redis.zrevrangebyscore(key, '+inf', 1, :limit => [offset, limit], :withscores => true).map { |tag, count| { :tag => tag, :count => count } }
end
|
.unlock(jobs) ⇒ Object
360
361
362
363
|
# File 'lib/delayed/backend/redis/job.rb', line 360
def self.unlock(jobs)
jobs.each(&:unlock!)
jobs.length
end
|
Instance Method Details
#==(other) ⇒ Object
184
185
186
|
# File 'lib/delayed/backend/redis/job.rb', line 184
def ==(other)
other.is_a?(self.class) && id == other.id
end
|
#[](key) ⇒ Object
160
161
162
|
# File 'lib/delayed/backend/redis/job.rb', line 160
def [](key)
send(key)
end
|
#[]=(key, value) ⇒ Object
164
165
166
|
# File 'lib/delayed/backend/redis/job.rb', line 164
def []=(key, value)
send("#{key}=", value)
end
|
#changes_applied ⇒ Object
397
398
399
400
|
# File 'lib/delayed/backend/redis/job.rb', line 397
def changes_applied
@previously_changed = changes
@changed_attributes.clear
end
|
#create_and_lock!(worker_name) ⇒ Object
421
422
423
424
|
# File 'lib/delayed/backend/redis/job.rb', line 421
def create_and_lock!(worker_name)
raise "job already exists" unless new_record?
lock_in_redis!(worker_name)
end
|
#destroy ⇒ Object
407
408
409
410
411
|
# File 'lib/delayed/backend/redis/job.rb', line 407
def destroy
self.class.functions.destroy_job(id, self.class.db_time_now)
@destroyed = true
freeze
end
|
#destroyed? ⇒ Boolean
180
181
182
|
# File 'lib/delayed/backend/redis/job.rb', line 180
def destroyed?
!!@destroyed
end
|
#fail! ⇒ Object
426
427
428
429
430
431
432
|
# File 'lib/delayed/backend/redis/job.rb', line 426
def fail!
self.failed_at = self.class.db_time_now
save!
redis.rename Keys::JOB[id], Keys::FAILED_JOB[id]
tickle_strand
self
end
|
#hash ⇒ Object
188
189
190
|
# File 'lib/delayed/backend/redis/job.rb', line 188
def hash
id.hash
end
|
#lock_in_redis!(worker_name) ⇒ Object
372
373
374
375
376
|
# File 'lib/delayed/backend/redis/job.rb', line 372
def lock_in_redis!(worker_name)
self.locked_at = self.class.db_time_now
self.locked_by = worker_name
save
end
|
#new_record? ⇒ Boolean
176
177
178
|
# File 'lib/delayed/backend/redis/job.rb', line 176
def new_record?
!!@new_record
end
|
#save(*a) ⇒ Object
383
384
385
386
387
388
389
390
391
392
393
394
|
# File 'lib/delayed/backend/redis/job.rb', line 383
def save(*a)
return false if destroyed?
result = run_callbacks(:save) do
if new_record?
run_callbacks(:create) { create }
else
update
end
end
changes_applied
result
end
|
#save!(*a) ⇒ Object
403
404
405
|
# File 'lib/delayed/backend/redis/job.rb', line 403
def save!(*a)
save(*a) || raise(RecordNotSaved)
end
|
#tickle_strand ⇒ Object
take this job off the strand, and queue up the next strand job if this job was at the front
415
416
417
418
419
|
# File 'lib/delayed/backend/redis/job.rb', line 415
def tickle_strand
if strand.present?
self.class.functions.tickle_strand(id, strand, self.class.db_time_now)
end
end
|
#transfer_lock!(from:, to:) ⇒ Object
368
369
370
|
# File 'lib/delayed/backend/redis/job.rb', line 368
def transfer_lock!(from:, to:)
lock_in_redis!(to)
end
|
#unlock! ⇒ Object
378
379
380
381
|
# File 'lib/delayed/backend/redis/job.rb', line 378
def unlock!
unlock
save!
end
|