Class: Resqued::Worker
- Inherits:
-
Object
- Object
- Resqued::Worker
- Includes:
- Logging
- Defined in:
- lib/resqued/worker.rb
Overview
Models a worker process.
Constant Summary collapse
- DEFAULT_WORKER_FACTORY =
lambda { |queues| require "resque" resque_worker = Resque::Worker.new(*queues) resque_worker.term_child = true if resque_worker.respond_to?("term_child=") redis_client = Resque.redis.respond_to?(:_client) ? Resque.redis._client : Resque.redis.client if redis_client.respond_to?(:reconnect) redis_client.reconnect else redis_client.close end resque_worker }
Instance Attribute Summary collapse
-
#pid ⇒ Object
readonly
Public: The pid of the worker process.
-
#queues ⇒ Object
readonly
Private.
Instance Method Summary collapse
-
#backing_off_for ⇒ Object
Public: The amount of time we need to wait before starting a new worker.
-
#finished!(process_status) ⇒ Object
Public: The old worker process finished!.
-
#idle? ⇒ Boolean
Public: True if there is no worker process mapped to this object.
-
#initialize(options) ⇒ Worker
constructor
A new instance of Worker.
-
#kill(signal) ⇒ Object
Public: Shut this worker down.
-
#queue_key ⇒ Object
Public: A string that compares if this worker is equivalent to a worker in another Resqued::Listener.
-
#running_here? ⇒ Boolean
Public: True if this worker is running in this process.
-
#try_start ⇒ Object
Public: Start a job, if there’s one waiting in one of my queues.
-
#wait_for(pid) ⇒ Object
Public: Claim this worker for another listener’s worker.
Methods included from Logging
build_logger, close_log, #log, log_file, log_file=, #log_to_stdout?, logger, logging_io, #reopen_logs
Constructor Details
#initialize(options) ⇒ Worker
Returns a new instance of Worker.
24 25 26 27 28 29 30 31 |
# File 'lib/resqued/worker.rb', line 24 def initialize() @queues = .fetch(:queues) @config = .fetch(:config) @interval = [:interval] @backoff = Backoff.new @worker_factory = .fetch(:worker_factory, DEFAULT_WORKER_FACTORY) @pids = [] end |
Instance Attribute Details
#pid ⇒ Object (readonly)
Public: The pid of the worker process.
34 35 36 |
# File 'lib/resqued/worker.rb', line 34 def pid @pid end |
#queues ⇒ Object (readonly)
Private.
37 38 39 |
# File 'lib/resqued/worker.rb', line 37 def queues @queues end |
Instance Method Details
#backing_off_for ⇒ Object
Public: The amount of time we need to wait before starting a new worker.
83 84 85 |
# File 'lib/resqued/worker.rb', line 83 def backing_off_for @pid ? nil : @backoff.how_long? end |
#finished!(process_status) ⇒ Object
Public: The old worker process finished!
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/resqued/worker.rb', line 64 def finished!(process_status) summary = "(#{@pid}/#{@pids.inspect}/self_started=#{@self_started}/killed=#{@killed})" if process_status.nil? && !@self_started log :debug, "#{summary} I am no longer blocked." @pid = nil @backoff.died unless @killed elsif !process_status.nil? && @self_started alive_time_sec = Process.clock_gettime(Process::CLOCK_MONOTONIC) - @start_time @config.after_exit(WorkerSummary.new(alive_time_sec: alive_time_sec, process_status: process_status)) log :debug, "#{summary} I exited: #{process_status}" @pid = nil @backoff.died unless @killed else log :debug, "#{summary} Reports of my death are highly exaggerated (#{process_status.inspect})" end end |
#idle? ⇒ Boolean
Public: True if there is no worker process mapped to this object.
40 41 42 |
# File 'lib/resqued/worker.rb', line 40 def idle? pid.nil? end |
#kill(signal) ⇒ Object
Public: Shut this worker down.
117 118 119 120 121 122 |
# File 'lib/resqued/worker.rb', line 117 def kill(signal) Process.kill(signal.to_s, pid) if pid && @self_started @killed = true rescue Errno::ESRCH => e log "Can't kill #{pid}: #{e}" end |
#queue_key ⇒ Object
Public: A string that compares if this worker is equivalent to a worker in another Resqued::Listener.
50 51 52 |
# File 'lib/resqued/worker.rb', line 50 def queue_key Digest::SHA256.hexdigest(queues.sort.join(";")) end |
#running_here? ⇒ Boolean
Public: True if this worker is running in this process.
45 46 47 |
# File 'lib/resqued/worker.rb', line 45 def running_here? !idle? && @self_started end |
#try_start ⇒ Object
Public: Start a job, if there’s one waiting in one of my queues.
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/resqued/worker.rb', line 88 def try_start return if @backoff.wait? @backoff.started @self_started = true @killed = false @start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) if @pid = fork @pids << @pid # still in the listener log "Forked worker #{@pid}" else # In case we get a signal before resque is ready for it. Resqued::Listener::ALL_SIGNALS.each { |signal| trap(signal, "DEFAULT") } # Continue ignoring SIGHUP, though. trap(:HUP) {} # If we get a QUIT during boot, just spin back down. trap(:QUIT) { exit! 0 } $0 = "STARTING RESQUE FOR #{queues.join(',')}" resque_worker = @worker_factory.call(queues) @config.after_fork(resque_worker) resque_worker.work(@interval || 5) exit 0 end end |
#wait_for(pid) ⇒ Object
Public: Claim this worker for another listener’s worker.
55 56 57 58 59 60 61 |
# File 'lib/resqued/worker.rb', line 55 def wait_for(pid) raise "Already running #{@pid} (can't wait for #{pid})" if @pid @self_started = false @pids << pid @pid = pid end |