Class: Sidekiq::JobSet
Overview
Base class for all sorted sets which contain jobs, e.g. scheduled, retry and dead. Sidekiq Pro and Enterprise add additional sorted sets which do not contain job data, e.g. Batches.
Direct Known Subclasses
Instance Attribute Summary
Attributes inherited from SortedSet
Instance Method Summary collapse
-
#delete_by_jid(score, jid) ⇒ Object
(also: #delete)
private
:nodoc:.
-
#delete_by_value(name, value) ⇒ Object
private
:nodoc:.
- #each ⇒ Object
-
#fetch(score, jid = nil) ⇒ Array<SortedEntry>
Fetch jobs that match a given time or Range.
-
#find_job(jid) ⇒ SortedEntry
Find the job with the given JID within this sorted set.
-
#kill_all(notify_failure: false, ex: nil) ⇒ Object
Move all jobs from this Set to the Dead Set.
- #pop_each ⇒ Object
- #retry_all ⇒ Object
-
#schedule(timestamp, job) ⇒ Object
Add a job with the associated timestamp to this set.
Methods inherited from SortedSet
#as_json, #clear, #initialize, #scan, #size
Constructor Details
This class inherits a constructor from Sidekiq::SortedSet
Instance Method Details
#delete_by_jid(score, jid) ⇒ Object Also known as: delete
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
:nodoc:
872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 |
# File 'lib/sidekiq/api.rb', line 872 def delete_by_jid(score, jid) Sidekiq.redis do |conn| elements = conn.zrange(name, score, score, "BYSCORE") elements.each do |element| if element.index(jid) = Sidekiq.load_json(element) if ["jid"] == jid ret = conn.zrem(name, element) @_size -= 1 if ret break ret end end end end end |
#delete_by_value(name, value) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
:nodoc:
862 863 864 865 866 867 868 |
# File 'lib/sidekiq/api.rb', line 862 def delete_by_value(name, value) Sidekiq.redis do |conn| ret = conn.zrem(name, value) @_size -= 1 if ret ret end end |
#each ⇒ Object
796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 |
# File 'lib/sidekiq/api.rb', line 796 def each initial_size = @_size offset_size = 0 page = -1 page_size = 50 loop do range_start = page * page_size + offset_size range_end = range_start + page_size - 1 elements = Sidekiq.redis { |conn| conn.zrange name, range_start, range_end, "withscores" } break if elements.empty? page -= 1 elements.reverse_each do |element, score| yield SortedEntry.new(self, score, element) end offset_size = initial_size - @_size end end |
#fetch(score, jid = nil) ⇒ Array<SortedEntry>
Fetch jobs that match a given time or Range. Job ID is an optional second argument.
824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 |
# File 'lib/sidekiq/api.rb', line 824 def fetch(score, jid = nil) begin_score, end_score = if score.is_a?(Range) [score.first, score.last] else [score, score] end elements = Sidekiq.redis { |conn| conn.zrange(name, begin_score, end_score, "BYSCORE", "withscores") } elements.each_with_object([]) do |element, result| data, job_score = element entry = SortedEntry.new(self, job_score, data) result << entry if jid.nil? || entry.jid == jid end end |
#find_job(jid) ⇒ SortedEntry
Find the job with the given JID within this sorted set. *This is a slow O(n) operation*. Do not use for app logic.
849 850 851 852 853 854 855 856 857 858 |
# File 'lib/sidekiq/api.rb', line 849 def find_job(jid) Sidekiq.redis do |conn| conn.zscan(name, match: "*#{jid}*", count: 100) do |entry, score| job = Sidekiq.load_json(entry) matched = job["jid"] == jid return SortedEntry.new(self, score, entry) if matched end end nil end |
#kill_all(notify_failure: false, ex: nil) ⇒ Object
Move all jobs from this Set to the Dead Set. See DeadSet#kill
783 784 785 786 787 788 789 790 791 792 793 794 |
# File 'lib/sidekiq/api.rb', line 783 def kill_all(notify_failure: false, ex: nil) ds = DeadSet.new opts = {notify_failure: notify_failure, ex: ex, trim: false} begin pop_each do |msg, _| ds.kill(msg, opts) end ensure ds.trim end end |
#pop_each ⇒ Object
761 762 763 764 765 766 767 768 769 |
# File 'lib/sidekiq/api.rb', line 761 def pop_each Sidekiq.redis do |c| size.times do data, score = c.zpopmin(name, 1)&.first break unless data yield data, score end end end |
#retry_all ⇒ Object
771 772 773 774 775 776 777 778 779 |
# File 'lib/sidekiq/api.rb', line 771 def retry_all c = Sidekiq::Client.new pop_each do |msg, _| job = Sidekiq.load_json(msg) # Manual retries should not count against the retry limit. job["retry_count"] -= 1 if job["retry_count"] c.push(job) end end |