Class: Sidekiq::WebCustom::Processor
- Inherits:
-
Processor
- Object
- Processor
- Sidekiq::WebCustom::Processor
- Defined in:
- lib/sidekiq/web_custom/processor.rb
Class Method Summary collapse
- .__processor__(queue:, options: Sidekiq.default_configuration.default_capsule) ⇒ Object
- .execute(max:, queue:, options: Sidekiq.default_configuration.default_capsule) ⇒ Object
- .execute_job(job:, options: Sidekiq.default_configuration.default_capsule) ⇒ Object
Instance Method Summary collapse
- #__execute(max:) ⇒ Object
- #__execute_job(job:) ⇒ Object
-
#initialize(options:, queue:) ⇒ Processor
constructor
A new instance of Processor.
Constructor Details
#initialize(options:, queue:) ⇒ Processor
Returns a new instance of Processor.
26 27 28 29 30 31 |
# File 'lib/sidekiq/web_custom/processor.rb', line 26 def initialize(options:, queue:) @__queue = queue @__basic_fetch = .fetcher.class == BasicFetch super() end |
Class Method Details
.__processor__(queue:, options: Sidekiq.default_configuration.default_capsule) ⇒ Object
17 18 19 20 21 22 23 24 |
# File 'lib/sidekiq/web_custom/processor.rb', line 17 def self.__processor__(queue:, options: Sidekiq.default_configuration.default_capsule) = .dup queue = queue.is_a?(String) ? Sidekiq::Queue.new(queue) : queue .queues = [queue.name] new(options: , queue: queue) end |
.execute(max:, queue:, options: Sidekiq.default_configuration.default_capsule) ⇒ Object
7 8 9 |
# File 'lib/sidekiq/web_custom/processor.rb', line 7 def self.execute(max:, queue:, options: Sidekiq.default_configuration.default_capsule) __processor__(queue: queue, options: ).__execute(max: max) end |
.execute_job(job:, options: Sidekiq.default_configuration.default_capsule) ⇒ Object
11 12 13 14 15 |
# File 'lib/sidekiq/web_custom/processor.rb', line 11 def self.execute_job(job:, options: Sidekiq.default_configuration.default_capsule) __processor__(queue: job.queue, options: ).__execute_job(job: job) rescue StandardError => _ false # error gets loggged downstream end |
Instance Method Details
#__execute(max:) ⇒ Object
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/sidekiq/web_custom/processor.rb', line 55 def __execute(max:) count = 0 max.times do break if @__queue.size <= 0 if Thread.current[Sidekiq::WebCustom::BREAK_BIT] logger.warn "Yikes -- Break bit has been set. Attempting to return in time. Completed #{count} of attempted #{max}" break end logger.info { "Manually processing next item in queue:[#{@__queue.name}]" } process_one count += 1 end count rescue Exception => ex if @job && @__basic_fetch logger.fatal "Processor Execution interrupted. Lost Job #{@job.job}" end logger.warn "Manual execution has terminated. Received error [#{ex.}]" return count end |
#__execute_job(job:) ⇒ Object
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/sidekiq/web_custom/processor.rb', line 33 def __execute_job(job:) queue_name = "queue:#{job.queue}" work_unit = Sidekiq::BasicFetch::UnitOfWork.new(queue_name, job.item.to_json) begin logger.info "Manually processing individual work unit for #{work_unit.queue_name}" process(work_unit) rescue StandardError => e logger.error "Manually processed work unit failed with #{e.}. Work unit will not be dequeued" raise e end begin job.delete logger.info { "Manually processed work unit sucessfully dequeued." } rescue StandardError => e logger.fatal "Manually processed work unit failed to be dequeued. #{e.}." raise e end true end |