Class: Sidekiq::JobSet
Overview
Base class for all sorted sets which contain jobs, e.g. scheduled, retry and dead. Sidekiq Pro and Enterprise add additional sorted sets which do not contain job data, e.g. Batches.
Direct Known Subclasses
Instance Attribute Summary
Attributes inherited from SortedSet
Instance Method Summary collapse
-
#delete_by_jid(score, jid) ⇒ Object
(also: #delete)
private
:nodoc:.
-
#delete_by_value(name, value) ⇒ Object
private
:nodoc:.
- #each ⇒ Object
-
#fetch(score, jid = nil) ⇒ Array<SortedEntry>
Fetch jobs that match a given time or Range.
-
#find_job(jid) ⇒ SortedEntry
Find the job with the given JID within this sorted set.
-
#schedule(timestamp, job) ⇒ Object
Add a job with the associated timestamp to this set.
Methods inherited from SortedSet
#as_json, #clear, #initialize, #scan, #size
Constructor Details
This class inherits a constructor from Sidekiq::SortedSet
Instance Method Details
#delete_by_jid(score, jid) ⇒ Object Also known as: delete
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
:nodoc:
747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 |
# File 'lib/sidekiq/api.rb', line 747 def delete_by_jid(score, jid) Sidekiq.redis do |conn| elements = conn.zrange(name, score, score, "BYSCORE") elements.each do |element| if element.index(jid) = Sidekiq.load_json(element) if ["jid"] == jid ret = conn.zrem(name, element) @_size -= 1 if ret break ret end end end end end |
#delete_by_value(name, value) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
:nodoc:
737 738 739 740 741 742 743 |
# File 'lib/sidekiq/api.rb', line 737 def delete_by_value(name, value) Sidekiq.redis do |conn| ret = conn.zrem(name, value) @_size -= 1 if ret ret end end |
#each ⇒ Object
671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 |
# File 'lib/sidekiq/api.rb', line 671 def each initial_size = @_size offset_size = 0 page = -1 page_size = 50 loop do range_start = page * page_size + offset_size range_end = range_start + page_size - 1 elements = Sidekiq.redis { |conn| conn.zrange name, range_start, range_end, "withscores" } break if elements.empty? page -= 1 elements.reverse_each do |element, score| yield SortedEntry.new(self, score, element) end offset_size = initial_size - @_size end end |
#fetch(score, jid = nil) ⇒ Array<SortedEntry>
Fetch jobs that match a given time or Range. Job ID is an optional second argument.
699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 |
# File 'lib/sidekiq/api.rb', line 699 def fetch(score, jid = nil) begin_score, end_score = if score.is_a?(Range) [score.first, score.last] else [score, score] end elements = Sidekiq.redis { |conn| conn.zrange(name, begin_score, end_score, "BYSCORE", "withscores") } elements.each_with_object([]) do |element, result| data, job_score = element entry = SortedEntry.new(self, job_score, data) result << entry if jid.nil? || entry.jid == jid end end |
#find_job(jid) ⇒ SortedEntry
Find the job with the given JID within this sorted set. *This is a slow O(n) operation*. Do not use for app logic.
724 725 726 727 728 729 730 731 732 733 |
# File 'lib/sidekiq/api.rb', line 724 def find_job(jid) Sidekiq.redis do |conn| conn.zscan(name, match: "*#{jid}*", count: 100) do |entry, score| job = Sidekiq.load_json(entry) matched = job["jid"] == jid return SortedEntry.new(self, score, entry) if matched end end nil end |