Class: Sidekiq::JobSet
Overview
Base class for all sorted sets which contain jobs, e.g. scheduled, retry and dead. Sidekiq Pro and Enterprise add additional sorted sets which do not contain job data, e.g. Batches.
Direct Known Subclasses
Instance Attribute Summary
Attributes inherited from SortedSet
Instance Method Summary collapse
- #each ⇒ Object
-
#fetch(score, jid = nil) ⇒ Array<SortedEntry>
Fetch jobs that match a given time or Range.
-
#find_job(jid) ⇒ SortedEntry
Find the job with the given JID within this sorted set.
-
#kill_all(notify_failure: false, ex: nil) ⇒ Object
Move all jobs from this Set to the Dead Set.
- #pop_each ⇒ Object
- #retry_all ⇒ Object
-
#schedule(timestamp, job) ⇒ Object
Add a job with the associated timestamp to this set.
Methods inherited from SortedSet
Instance Method Details
#each ⇒ Object
704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 |
# File 'lib/sidekiq/api.rb', line 704 def each initial_size = @_size offset_size = 0 page = -1 page_size = 50 loop do range_start = page * page_size + offset_size range_end = range_start + page_size - 1 elements = Sidekiq.redis { |conn| conn.zrange name, range_start, range_end, "withscores" } break if elements.empty? page -= 1 elements.reverse_each do |element, score| yield SortedEntry.new(self, score, element) end offset_size = initial_size - @_size end end |
#fetch(score, jid = nil) ⇒ Array<SortedEntry>
Fetch jobs that match a given time or Range. Job ID is an optional second argument.
732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 |
# File 'lib/sidekiq/api.rb', line 732 def fetch(score, jid = nil) begin_score, end_score = if score.is_a?(Range) [score.first, score.last] else [score, score] end elements = Sidekiq.redis { |conn| conn.zrange(name, begin_score, end_score, "BYSCORE", "withscores") } elements.each_with_object([]) do |element, result| data, job_score = element entry = SortedEntry.new(self, job_score, data) result << entry if jid.nil? || entry.jid == jid end end |
#find_job(jid) ⇒ SortedEntry
Find the job with the given JID within this sorted set. *This is a slow O(n) operation*. Do not use for app logic.
757 758 759 760 761 762 763 764 765 766 |
# File 'lib/sidekiq/api.rb', line 757 def find_job(jid) Sidekiq.redis do |conn| conn.zscan(name, match: "*#{jid}*", count: 100) do |entry, score| job = Sidekiq.load_json(entry) matched = job["jid"] == jid return SortedEntry.new(self, score, entry) if matched end end nil end |
#kill_all(notify_failure: false, ex: nil) ⇒ Object
Move all jobs from this Set to the Dead Set. See DeadSet#kill
691 692 693 694 695 696 697 698 699 700 701 702 |
# File 'lib/sidekiq/api.rb', line 691 def kill_all(notify_failure: false, ex: nil) ds = DeadSet.new opts = {notify_failure: notify_failure, ex: ex, trim: false} begin pop_each do |msg, _| ds.kill(msg, opts) end ensure ds.trim end end |
#pop_each ⇒ Object
669 670 671 672 673 674 675 676 677 |
# File 'lib/sidekiq/api.rb', line 669 def pop_each Sidekiq.redis do |c| size.times do data, score = c.zpopmin(name, 1)&.first break unless data yield data, score end end end |
#retry_all ⇒ Object
679 680 681 682 683 684 685 686 687 |
# File 'lib/sidekiq/api.rb', line 679 def retry_all c = Sidekiq::Client.new pop_each do |msg, _| job = Sidekiq.load_json(msg) # Manual retries should not count against the retry limit. job["retry_count"] -= 1 if job["retry_count"] c.push(job) end end |