Module: Sidekiq::Sqs::Manager
- Extended by:
- ActiveSupport::Concern
- Defined in:
- lib/sidekiq-sqs/manager.rb
Instance Method Summary collapse
- #assign(msg, queue) ⇒ Object
- #initialize_with_sqs(options = {}) ⇒ Object
- #stop(options = {}) ⇒ Object
Instance Method Details
#assign(msg, queue) ⇒ Object
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/sidekiq-sqs/manager.rb', line 17 def assign(msg, queue) watchdog("Manager#assign died") do if stopped? # Race condition between Manager#stop if Fetcher # is blocked on redis and gets a message after # all the ready Processors have been stopped. # Push the message back to redis. # msg.visibility_timeout = 0 else processor = @ready.pop @in_progress[processor.object_id] = [msg, queue] @busy << processor processor.process!(msg, queue) end end end |
#initialize_with_sqs(options = {}) ⇒ Object
11 12 13 14 15 |
# File 'lib/sidekiq-sqs/manager.rb', line 11 def initialize_with_sqs( = {}) initialize_without_sqs() @fetcher = Sidekiq::Fetcher.pool(args: [current_actor, [:queues], !![:strict]]) end |
#stop(options = {}) ⇒ Object
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/sidekiq-sqs/manager.rb', line 35 def stop(={}) watchdog('Manager#stop died') do shutdown = [:shutdown] timeout = [:timeout] @done = true Sidekiq::Fetcher.done! #@fetcher.finalize logger.info { "Shutting down #{@ready.size} quiet workers" } @ready.each { |x| x.terminate if x.alive? } @ready.clear logger.debug { "Clearing workers in redis" } Sidekiq.redis do |conn| workers = conn.smembers('workers') workers.each do |name| conn.srem('workers', name) if name =~ /:#{process_id}-/ end end return after(0) { signal(:shutdown) } if @busy.empty? logger.info { "Pausing up to #{timeout} seconds to allow workers to finish..." } hard_shutdown_in timeout if shutdown end end |