Module: Sidekiq::Sqs::Manager

Extended by:
ActiveSupport::Concern
Defined in:
lib/sidekiq-sqs/manager.rb

Instance Method Summary collapse

Instance Method Details

#assign(msg, queue) ⇒ Object



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/sidekiq-sqs/manager.rb', line 17

def assign(msg, queue)
  watchdog("Manager#assign died") do
    if stopped?
      # Race condition between Manager#stop if Fetcher
      # is blocked on redis and gets a message after
      # all the ready Processors have been stopped.
      # Push the message back to redis.
      #
      msg.visibility_timeout = 0
    else
      processor = @ready.pop
      @in_progress[processor.object_id] = [msg, queue]
      @busy << processor
      processor.process!(msg, queue)
    end
  end
end

#initialize_with_sqs(options = {}) ⇒ Object



11
12
13
14
15
# File 'lib/sidekiq-sqs/manager.rb', line 11

def initialize_with_sqs(options = {})
  initialize_without_sqs(options)

  @fetcher = Sidekiq::Fetcher.pool(args: [current_actor, options[:queues], !!options[:strict]])
end

#stop(options = {}) ⇒ Object



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/sidekiq-sqs/manager.rb', line 35

def stop(options={})
  watchdog('Manager#stop died') do
    shutdown = options[:shutdown]
    timeout = options[:timeout]

    @done = true
    Sidekiq::Fetcher.done!
    #@fetcher.finalize

    logger.info { "Shutting down #{@ready.size} quiet workers" }
    @ready.each { |x| x.terminate if x.alive? }
    @ready.clear

    logger.debug { "Clearing workers in redis" }
    Sidekiq.redis do |conn|
      workers = conn.smembers('workers')
      workers.each do |name|
        conn.srem('workers', name) if name =~ /:#{process_id}-/
      end
    end

    return after(0) { signal(:shutdown) } if @busy.empty?
    logger.info { "Pausing up to #{timeout} seconds to allow workers to finish..." }
    hard_shutdown_in timeout if shutdown
  end
end