Class: Sidekiq::JobSet

Inherits:
SortedSet show all
Defined in:
lib/sidekiq/api.rb

Direct Known Subclasses

DeadSet, RetrySet, ScheduledSet

Instance Attribute Summary

Attributes inherited from SortedSet

#name

Instance Method Summary collapse

Methods inherited from SortedSet

#clear, #initialize, #size

Constructor Details

This class inherits a constructor from Sidekiq::SortedSet

Instance Method Details

#delete(score, jid = nil) ⇒ Object



361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
# File 'lib/sidekiq/api.rb', line 361

def delete(score, jid = nil)
  if jid
    elements = Sidekiq.redis do |conn|
      conn.zrangebyscore(name, score, score)
    end

    elements_with_jid = elements.map do |element|
      message = Sidekiq.load_json(element)

      if message["jid"] == jid
        _, @_size = Sidekiq.redis do |conn|
          conn.multi do
            conn.zrem(name, element)
            conn.zcard name
          end
        end
      end
    end
    elements_with_jid.count != 0
  else
    count, @_size = Sidekiq.redis do |conn|
      conn.multi do
        conn.zremrangebyscore(name, score, score)
        conn.zcard name
      end
    end
    count != 0
  end
end

#each(&block) ⇒ Object



320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
# File 'lib/sidekiq/api.rb', line 320

def each(&block)
  initial_size = @_size
  offset_size = 0
  page = -1
  page_size = 50

  loop do
    range_start = page * page_size + offset_size
    range_end   = page * page_size + offset_size + (page_size - 1)
    elements = Sidekiq.redis do |conn|
      conn.zrange name, range_start, range_end, :with_scores => true
    end
    break if elements.empty?
    page -= 1
    elements.each do |element, score|
      block.call SortedEntry.new(self, score, element)
    end
    offset_size = initial_size - @_size
  end
end

#fetch(score, jid = nil) ⇒ Object



341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
# File 'lib/sidekiq/api.rb', line 341

def fetch(score, jid = nil)
  elements = Sidekiq.redis do |conn|
    conn.zrangebyscore(name, score, score)
  end

  elements.inject([]) do |result, element|
    entry = SortedEntry.new(self, score, element)
    if jid
      result << entry if entry.jid == jid
    else
      result << entry
    end
    result
  end
end

#find_job(jid) ⇒ Object



357
358
359
# File 'lib/sidekiq/api.rb', line 357

def find_job(jid)
  self.detect { |j| j.jid == jid }
end

#schedule(timestamp, message) ⇒ Object



314
315
316
317
318
# File 'lib/sidekiq/api.rb', line 314

def schedule(timestamp, message)
  Sidekiq.redis do |conn|
    conn.zadd(name, timestamp.to_f.to_s, Sidekiq.dump_json(message))
  end
end