Class: Sidekiq::WebCustom::Processor

Inherits:
Processor
  • Object
show all
Defined in:
lib/sidekiq/web_custom/processor.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options:, queue:) ⇒ Processor

Returns a new instance of Processor.



26
27
28
29
30
31
# File 'lib/sidekiq/web_custom/processor.rb', line 26

def initialize(options:, queue:)
  @__queue = queue
  @__basic_fetch = options.fetcher.class == BasicFetch

  super(options)
end

Class Method Details

.__processor__(queue:, options: Sidekiq.default_configuration.default_capsule) ⇒ Object



17
18
19
20
21
22
23
24
# File 'lib/sidekiq/web_custom/processor.rb', line 17

def self.__processor__(queue:, options: Sidekiq.default_configuration.default_capsule)
  options_temp = options.dup
  queue = queue.is_a?(String) ? Sidekiq::Queue.new(queue) : queue

  options_temp.queues = [queue.name]

  new(options: options_temp, queue: queue)
end

.execute(max:, queue:, options: Sidekiq.default_configuration.default_capsule) ⇒ Object



7
8
9
# File 'lib/sidekiq/web_custom/processor.rb', line 7

def self.execute(max:, queue:, options: Sidekiq.default_configuration.default_capsule)
  __processor__(queue: queue, options: options).__execute(max: max)
end

.execute_job(job:, options: Sidekiq.default_configuration.default_capsule) ⇒ Object



11
12
13
14
15
# File 'lib/sidekiq/web_custom/processor.rb', line 11

def self.execute_job(job:, options: Sidekiq.default_configuration.default_capsule)
  __processor__(queue: job.queue, options: options).__execute_job(job: job)
rescue StandardError => _
  false # error gets loggged downstream
end

Instance Method Details

#__execute(max:) ⇒ Object



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/sidekiq/web_custom/processor.rb', line 55

def __execute(max:)
  count = 0
  max.times do
    break if @__queue.size <= 0

    if Thread.current[Sidekiq::WebCustom::BREAK_BIT]
      logger.warn "Yikes -- Break bit has been set. Attempting to return in time. Completed #{count} of attempted #{max}"
      break
    end

    logger.info { "Manually processing next item in queue:[#{@__queue.name}]" }
    process_one
    count += 1
  end

  count
rescue Exception => ex
  if @job && @__basic_fetch
    logger.fatal "Processor Execution interrupted. Lost Job #{@job.job}"
  end
  logger.warn "Manual execution has terminated. Received error [#{ex.message}]"
  return count
end

#__execute_job(job:) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/sidekiq/web_custom/processor.rb', line 33

def __execute_job(job:)
  queue_name = "queue:#{job.queue}"
  work_unit = Sidekiq::BasicFetch::UnitOfWork.new(queue_name, job.item.to_json)
  begin
    logger.info "Manually processing individual work unit for #{work_unit.queue_name}"
    process(work_unit)
  rescue StandardError => e
    logger.error "Manually processed work unit failed with #{e.message}. Work unit will not be dequeued"
    raise e
  end

  begin
    job.delete
    logger.info { "Manually processed work unit sucessfully dequeued." }
  rescue StandardError => e
    logger.fatal "Manually processed work unit failed to be dequeued. #{e.message}."
    raise e
  end

  true
end