Class: Sidekiq::JobSet
Instance Attribute Summary
Attributes inherited from SortedSet
#name
Instance Method Summary
collapse
Methods inherited from SortedSet
#clear, #initialize, #size
Instance Method Details
#delete(score, jid = nil) ⇒ Object
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
|
# File 'lib/sidekiq/api.rb', line 427
def delete(score, jid = nil)
if jid
elements = Sidekiq.redis do |conn|
conn.zrangebyscore(name, score, score)
end
elements_with_jid = elements.map do |element|
message = Sidekiq.load_json(element)
if message["jid"] == jid
_, @_size = Sidekiq.redis do |conn|
conn.multi do
conn.zrem(name, element)
conn.zcard name
end
end
end
end
elements_with_jid.count != 0
else
count, @_size = Sidekiq.redis do |conn|
conn.multi do
conn.zremrangebyscore(name, score, score)
conn.zcard name
end
end
count != 0
end
end
|
#each(&block) ⇒ Object
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
|
# File 'lib/sidekiq/api.rb', line 386
def each(&block)
initial_size = @_size
offset_size = 0
page = -1
page_size = 50
loop do
range_start = page * page_size + offset_size
range_end = page * page_size + offset_size + (page_size - 1)
elements = Sidekiq.redis do |conn|
conn.zrange name, range_start, range_end, :with_scores => true
end
break if elements.empty?
page -= 1
elements.each do |element, score|
block.call SortedEntry.new(self, score, element)
end
offset_size = initial_size - @_size
end
end
|
#fetch(score, jid = nil) ⇒ Object
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
|
# File 'lib/sidekiq/api.rb', line 407
def fetch(score, jid = nil)
elements = Sidekiq.redis do |conn|
conn.zrangebyscore(name, score, score)
end
elements.inject([]) do |result, element|
entry = SortedEntry.new(self, score, element)
if jid
result << entry if entry.jid == jid
else
result << entry
end
result
end
end
|
#find_job(jid) ⇒ Object
423
424
425
|
# File 'lib/sidekiq/api.rb', line 423
def find_job(jid)
self.detect { |j| j.jid == jid }
end
|
#schedule(timestamp, message) ⇒ Object
380
381
382
383
384
|
# File 'lib/sidekiq/api.rb', line 380
def schedule(timestamp, message)
Sidekiq.redis do |conn|
conn.zadd(name, timestamp.to_f.to_s, Sidekiq.dump_json(message))
end
end
|