Class: GoodJob::Bulk::Buffer
- Inherits:
-
Object
- Object
- GoodJob::Bulk::Buffer
- Defined in:
- lib/good_job/bulk.rb
Instance Method Summary collapse
- #active_jobs ⇒ Object
- #active_jobs_by_queue_adapter ⇒ Object
- #add(active_jobs, queue_adapter: nil) ⇒ Object
- #capture ⇒ Object
- #enqueue ⇒ Object
-
#initialize ⇒ Buffer
constructor
A new instance of Buffer.
Constructor Details
#initialize ⇒ Buffer
Returns a new instance of Buffer.
63 64 65 |
# File 'lib/good_job/bulk.rb', line 63 def initialize @values = [] end |
Instance Method Details
#active_jobs ⇒ Object
120 121 122 |
# File 'lib/good_job/bulk.rb', line 120 def active_jobs @values.map(&:first) end |
#active_jobs_by_queue_adapter ⇒ Object
113 114 115 116 117 118 |
# File 'lib/good_job/bulk.rb', line 113 def active_jobs_by_queue_adapter @values.each_with_object({}) do |(job, adapter), memo| memo[adapter] ||= [] memo[adapter] << job end end |
#add(active_jobs, queue_adapter: nil) ⇒ Object
75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/good_job/bulk.rb', line 75 def add(active_jobs, queue_adapter: nil) new_pairs = Array(active_jobs).map do |active_job| raise ArgumentError, "Expected an ActiveJob::Base instance, got #{active_job.class}" unless active_job.is_a?(ActiveJob::Base) adapter = queue_adapter || active_job.class.queue_adapter raise Error, "Jobs must have a Queue Adapter" unless adapter [active_job, adapter] end @values.append(*new_pairs) true end |
#capture ⇒ Object
67 68 69 70 71 72 73 |
# File 'lib/good_job/bulk.rb', line 67 def capture original_buffer = Bulk.current_buffer Bulk.current_buffer = self yield ensure Bulk.current_buffer = original_buffer end |
#enqueue ⇒ Object
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/good_job/bulk.rb', line 89 def enqueue Bulk.unbuffer do active_jobs_by_queue_adapter.each do |adapter, jobs| jobs = jobs.reject(&:provider_job_id) # Do not re-enqueue already enqueued jobs if adapter.respond_to?(:enqueue_all) unbulkable_jobs, bulkable_jobs = jobs.partition do |job| job.respond_to?(:good_job_concurrency_key) && job.good_job_concurrency_key && (job.class.good_job_concurrency_config[:enqueue_limit] || job.class.good_job_concurrency_config[:total_limit]) end adapter.enqueue_all(bulkable_jobs) if bulkable_jobs.any? else unbulkable_jobs = jobs end unbulkable_jobs.each do |job| job.enqueue rescue GoodJob::ActiveJobExtensions::Concurrency::ConcurrencyExceededError # ignore end end end end |