Class: Delayed::Backend::Redis::Job
- Inherits:
-
Object
- Object
- Delayed::Backend::Redis::Job
show all
- Extended by:
- ActiveModel::Callbacks
- Includes:
- ActiveModel::Dirty, Base
- Defined in:
- lib/delayed/backend/redis/job.rb
Direct Known Subclasses
Failed
Defined Under Namespace
Modules: Keys
Classes: Failed
Constant Summary
collapse
- WAITING_STRAND_JOB_PRIORITY =
2000000
- COLUMNS =
[]
- TIMESTAMP_COLUMNS =
We store time attributes in redis as floats so we don’t have to do timestamp parsing in lua.
[]
- INTEGER_COLUMNS =
[]
Constants included
from Base
Base::ON_HOLD_COUNT, Base::ON_HOLD_LOCKED_BY
Instance Attribute Summary collapse
Class Method Summary
collapse
-
.bulk_update(action, opts) ⇒ Object
perform a bulk update of a set of jobs action is :hold, :unhold, or :destroy to specify the jobs to act on, either pass opts = [list of job ids] or opts = <some flavor> to perform on all jobs of that flavor.
-
.clear_locks!(worker_name) ⇒ Object
-
.column(name, type) ⇒ Object
-
.create(attrs = {}) ⇒ Object
-
.create!(attrs = {}) ⇒ Object
-
.create_singleton(options) ⇒ Object
-
.find(ids) ⇒ Object
-
.find_available(limit, queue = Delayed::Settings.queue, min_priority = Delayed::MIN_PRIORITY, max_priority = Delayed::MAX_PRIORITY) ⇒ Object
-
.find_one(id, options) ⇒ Object
-
.find_some(ids, options) ⇒ Object
-
.functions ⇒ Object
-
.get_and_lock_next_available(worker_name, queue = Delayed::Settings.queue, min_priority = Delayed::MIN_PRIORITY, max_priority = Delayed::MAX_PRIORITY, prefetch: nil, prefetch_owner: nil) ⇒ Object
-
.get_with_ids(ids) ⇒ Object
-
.instantiate(attrs) ⇒ Object
-
.jobs_count(flavor, queue = Delayed::Settings.queue) ⇒ Object
get the total job count for the given flavor flavor is :current, :future or :failed for the :failed flavor, queue is currently ignored.
-
.key_for_job_id(job_id) ⇒ Object
-
.list_jobs(flavor, limit, offset = 0, query = nil) ⇒ Object
get a list of jobs of the given flavor in the given queue flavor is :current, :future, :failed, :strand or :tag depending on the flavor, query has a different meaning: for :current and :future, it’s the queue name (defaults to Delayed::Settings.queue) for :strand it’s the strand name for :tag it’s the tag name for :failed it’s ignored.
-
.reconnect! ⇒ Object
-
.running_jobs ⇒ Object
-
.strand_size(strand) ⇒ Object
-
.tag_counts(flavor, limit, offset = 0) ⇒ Object
returns a list of hashes { :tag => tag_name, :count => current_count } in descending count order flavor is :current or :all.
-
.unlock(jobs) ⇒ Object
Instance Method Summary
collapse
Methods included from Base
#batch?, #expired?, #failed?, #full_name, #hold!, included, #initialize_defaults, #invoke_job, #locked?, #name, #on_hold?, #payload_object, #payload_object=, #permanent_failure, #reschedule, #reschedule_at, #unhold!, #unlock
Constructor Details
#initialize(attrs = {}) ⇒ Job
Returns a new instance of Job.
135
136
137
138
139
140
|
# File 'lib/delayed/backend/redis/job.rb', line 135
def initialize(attrs = {})
attrs.each { |k, v| self.send("#{k}=", v) }
self.priority ||= 0
self.attempts ||= 0
@new_record = true
end
|
Instance Attribute Details
#on_conflict ⇒ Object
not saved, just used as a marker when creating
363
364
365
|
# File 'lib/delayed/backend/redis/job.rb', line 363
def on_conflict
@on_conflict
end
|
#singleton ⇒ Object
not saved, just used as a marker when creating
363
364
365
|
# File 'lib/delayed/backend/redis/job.rb', line 363
def singleton
@singleton
end
|
Class Method Details
.bulk_update(action, opts) ⇒ Object
perform a bulk update of a set of jobs action is :hold, :unhold, or :destroy to specify the jobs to act on, either pass opts = [list of job ids] or opts = <some flavor> to perform on all jobs of that flavor
see the list_jobs action for the list of available flavors and the meaning of opts for each
346
347
348
349
350
351
|
# File 'lib/delayed/backend/redis/job.rb', line 346
def self.bulk_update(action, opts)
if %w(current future).include?(opts[:flavor].to_s)
opts[:query] ||= Delayed::Settings.queue
end
functions.bulk_update(action, opts[:ids], opts[:flavor], opts[:query], db_time_now)
end
|
.clear_locks!(worker_name) ⇒ Object
320
321
322
323
324
325
326
|
# File 'lib/delayed/backend/redis/job.rb', line 320
def self.clear_locks!(worker_name)
self.running_jobs.each do |job|
job.unlock! if job.locked_by == worker_name
end
nil
end
|
.column(name, type) ⇒ Object
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
|
# File 'lib/delayed/backend/redis/job.rb', line 97
def self.column(name, type)
COLUMNS << name
if type == :timestamp
TIMESTAMP_COLUMNS << name
elsif type == :integer
INTEGER_COLUMNS << name
end
attr_reader(name)
define_attribute_methods([name])
class_eval(<<-EOS, __FILE__, __LINE__ + 1)
def #{name}=(new_value)
#{name}_will_change! unless new_value == self.#{name}
@#{name} = new_value
end
EOS
end
|
.create(attrs = {}) ⇒ Object
148
149
150
151
152
|
# File 'lib/delayed/backend/redis/job.rb', line 148
def self.create(attrs = {})
result = new(attrs)
result.save
result
end
|
.create!(attrs = {}) ⇒ Object
154
155
156
157
158
|
# File 'lib/delayed/backend/redis/job.rb', line 154
def self.create!(attrs = {})
result = new(attrs)
result.save!
result
end
|
.create_singleton(options) ⇒ Object
353
354
355
|
# File 'lib/delayed/backend/redis/job.rb', line 353
def self.create_singleton(options)
self.create!(options.merge(:singleton => true))
end
|
.find(ids) ⇒ Object
168
169
170
171
172
173
174
|
# File 'lib/delayed/backend/redis/job.rb', line 168
def self.find(ids)
if Array === ids
find_some(ids, {})
else
find_one(ids, {})
end
end
|
.find_available(limit, queue = Delayed::Settings.queue, min_priority = Delayed::MIN_PRIORITY, max_priority = Delayed::MAX_PRIORITY) ⇒ Object
249
250
251
252
253
254
255
256
257
258
|
# File 'lib/delayed/backend/redis/job.rb', line 249
def self.find_available(limit,
queue = Delayed::Settings.queue,
min_priority = Delayed::MIN_PRIORITY,
max_priority = Delayed::MAX_PRIORITY)
check_queue(queue)
check_priorities(min_priority, max_priority)
self.find(functions.find_available(queue, limit, 0, min_priority, max_priority, db_time_now))
end
|
.find_one(id, options) ⇒ Object
204
205
206
207
|
# File 'lib/delayed/backend/redis/job.rb', line 204
def self.find_one(id, options)
job = self.get_with_ids([id]).first
job || raise(ActiveRecord::RecordNotFound, "Couldn't find Job with ID=#{id}")
end
|
.find_some(ids, options) ⇒ Object
209
210
211
|
# File 'lib/delayed/backend/redis/job.rb', line 209
def self.find_some(ids, options)
self.get_with_ids(ids).compact
end
|
.get_and_lock_next_available(worker_name, queue = Delayed::Settings.queue, min_priority = Delayed::MIN_PRIORITY, max_priority = Delayed::MAX_PRIORITY, prefetch: nil, prefetch_owner: nil) ⇒ Object
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
|
# File 'lib/delayed/backend/redis/job.rb', line 221
def self.get_and_lock_next_available(worker_name,
queue = Delayed::Settings.queue,
min_priority = Delayed::MIN_PRIORITY,
max_priority = Delayed::MAX_PRIORITY,
prefetch: nil,
prefetch_owner: nil)
check_queue(queue)
check_priorities(min_priority, max_priority)
if worker_name.is_a?(Array)
multiple_workers = true
worker_name = worker_name.first
end
job_attrs = functions.get_and_lock_next_available(worker_name, queue, min_priority, max_priority, db_time_now)
job = instantiate_from_attrs(job_attrs) if multiple_workers
if job.nil?
job = {}
else
job = { worker_name => job }
end
end
job
end
|
.get_with_ids(ids) ⇒ Object
213
214
215
|
# File 'lib/delayed/backend/redis/job.rb', line 213
def self.get_with_ids(ids)
ids.map { |id| self.instantiate_from_attrs(redis.hgetall(key_for_job_id(id))) }
end
|
.instantiate(attrs) ⇒ Object
142
143
144
145
146
|
# File 'lib/delayed/backend/redis/job.rb', line 142
def self.instantiate(attrs)
result = new(attrs)
result.instance_variable_set(:@new_record, false)
result
end
|
.jobs_count(flavor, queue = Delayed::Settings.queue) ⇒ Object
get the total job count for the given flavor flavor is :current, :future or :failed for the :failed flavor, queue is currently ignored
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
|
# File 'lib/delayed/backend/redis/job.rb', line 296
def self.jobs_count(flavor,
queue = Delayed::Settings.queue)
case flavor.to_s
when 'current'
check_queue(queue)
redis.zcard(Keys::QUEUE[queue])
when 'future'
check_queue(queue)
redis.zcard(Keys::FUTURE_QUEUE[queue])
when 'failed'
redis.zcard(Keys::FAILED_JOBS)
else
raise ArgumentError, "invalid flavor: #{flavor.inspect}"
end
end
|
.key_for_job_id(job_id) ⇒ Object
217
218
219
|
# File 'lib/delayed/backend/redis/job.rb', line 217
def self.key_for_job_id(job_id)
Keys::JOB[job_id]
end
|
.list_jobs(flavor, limit, offset = 0, query = nil) ⇒ Object
get a list of jobs of the given flavor in the given queue flavor is :current, :future, :failed, :strand or :tag depending on the flavor, query has a different meaning: for :current and :future, it’s the queue name (defaults to Delayed::Settings.queue) for :strand it’s the strand name for :tag it’s the tag name for :failed it’s ignored
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
|
# File 'lib/delayed/backend/redis/job.rb', line 267
def self.list_jobs(flavor,
limit,
offset = 0,
query = nil)
case flavor.to_s
when 'current'
query ||= Delayed::Settings.queue
check_queue(query)
self.find(functions.find_available(query, limit, offset, 0, "+inf", db_time_now))
when 'future'
query ||= Delayed::Settings.queue
check_queue(query)
self.find(redis.zrangebyscore(Keys::FUTURE_QUEUE[query], 0, "+inf", :limit => [offset, limit]))
when 'failed'
Failed.find(redis.zrevrangebyscore(Keys::FAILED_JOBS, "+inf", 0, :limit => [offset, limit]))
when 'strand'
self.find(redis.lrange(Keys::STRAND[query], offset, offset + limit - 1))
when 'tag'
ids = redis.smembers(Keys::TAG[query])
self.find(ids[offset, limit])
else
raise ArgumentError, "invalid flavor: #{flavor.inspect}"
end
end
|
.reconnect! ⇒ Object
192
193
194
195
196
197
198
|
# File 'lib/delayed/backend/redis/job.rb', line 192
def self.reconnect!
redis.respond_to?(:reconnect) ?
redis.reconnect :
redis.client.reconnect
end
|
.running_jobs ⇒ Object
316
317
318
|
# File 'lib/delayed/backend/redis/job.rb', line 316
def self.running_jobs()
self.find(redis.zrangebyscore(Keys::RUNNING_JOBS, 0, "+inf"))
end
|
.strand_size(strand) ⇒ Object
312
313
314
|
# File 'lib/delayed/backend/redis/job.rb', line 312
def self.strand_size(strand)
redis.llen(Keys::STRAND[strand])
end
|
.tag_counts(flavor, limit, offset = 0) ⇒ Object
returns a list of hashes { :tag => tag_name, :count => current_count } in descending count order flavor is :current or :all
331
332
333
334
335
336
337
|
# File 'lib/delayed/backend/redis/job.rb', line 331
def self.tag_counts(flavor,
limit,
offset = 0)
raise(ArgumentError, "invalid flavor: #{flavor.inspect}") unless %w(current all).include?(flavor.to_s)
key = Keys::TAG_COUNTS[flavor]
redis.zrevrangebyscore(key, '+inf', 1, :limit => [offset, limit], :withscores => true).map { |tag, count| { :tag => tag, :count => count } }
end
|
.unlock(jobs) ⇒ Object
357
358
359
360
|
# File 'lib/delayed/backend/redis/job.rb', line 357
def self.unlock(jobs)
jobs.each(&:unlock!)
jobs.length
end
|
Instance Method Details
#==(other) ⇒ Object
184
185
186
|
# File 'lib/delayed/backend/redis/job.rb', line 184
def ==(other)
other.is_a?(self.class) && id == other.id
end
|
#[](key) ⇒ Object
160
161
162
|
# File 'lib/delayed/backend/redis/job.rb', line 160
def [](key)
send(key)
end
|
#[]=(key, value) ⇒ Object
164
165
166
|
# File 'lib/delayed/backend/redis/job.rb', line 164
def []=(key, value)
send("#{key}=", value)
end
|
#changes_applied ⇒ Object
394
395
396
397
|
# File 'lib/delayed/backend/redis/job.rb', line 394
def changes_applied
@previously_changed = changes
@changed_attributes.clear
end
|
#create_and_lock!(worker_name) ⇒ Object
418
419
420
421
|
# File 'lib/delayed/backend/redis/job.rb', line 418
def create_and_lock!(worker_name)
raise "job already exists" unless new_record?
lock_in_redis!(worker_name)
end
|
#destroy ⇒ Object
404
405
406
407
408
|
# File 'lib/delayed/backend/redis/job.rb', line 404
def destroy
self.class.functions.destroy_job(id, self.class.db_time_now)
@destroyed = true
freeze
end
|
#destroyed? ⇒ Boolean
180
181
182
|
# File 'lib/delayed/backend/redis/job.rb', line 180
def destroyed?
!!@destroyed
end
|
#fail! ⇒ Object
423
424
425
426
427
428
429
|
# File 'lib/delayed/backend/redis/job.rb', line 423
def fail!
self.failed_at = self.class.db_time_now
save!
redis.rename Keys::JOB[id], Keys::FAILED_JOB[id]
tickle_strand
self
end
|
#hash ⇒ Object
188
189
190
|
# File 'lib/delayed/backend/redis/job.rb', line 188
def hash
id.hash
end
|
#lock_in_redis!(worker_name) ⇒ Object
369
370
371
372
373
|
# File 'lib/delayed/backend/redis/job.rb', line 369
def lock_in_redis!(worker_name)
self.locked_at = self.class.db_time_now
self.locked_by = worker_name
save
end
|
#new_record? ⇒ Boolean
176
177
178
|
# File 'lib/delayed/backend/redis/job.rb', line 176
def new_record?
!!@new_record
end
|
#save(*a) ⇒ Object
380
381
382
383
384
385
386
387
388
389
390
391
|
# File 'lib/delayed/backend/redis/job.rb', line 380
def save(*a)
return false if destroyed?
result = run_callbacks(:save) do
if new_record?
run_callbacks(:create) { create }
else
update
end
end
changes_applied
result
end
|
#save!(*a) ⇒ Object
400
401
402
|
# File 'lib/delayed/backend/redis/job.rb', line 400
def save!(*a)
save(*a) || raise(RecordNotSaved)
end
|
#tickle_strand ⇒ Object
take this job off the strand, and queue up the next strand job if this job was at the front
412
413
414
415
416
|
# File 'lib/delayed/backend/redis/job.rb', line 412
def tickle_strand
if strand.present?
self.class.functions.tickle_strand(id, strand, self.class.db_time_now)
end
end
|
#transfer_lock!(from:, to:) ⇒ Object
365
366
367
|
# File 'lib/delayed/backend/redis/job.rb', line 365
def transfer_lock!(from:, to:)
lock_in_redis!(to)
end
|
#unlock! ⇒ Object
375
376
377
378
|
# File 'lib/delayed/backend/redis/job.rb', line 375
def unlock!
unlock
save!
end
|