Class: Resqued::Worker
- Inherits:
-
Object
- Object
- Resqued::Worker
- Includes:
- Logging
- Defined in:
- lib/resqued/worker.rb
Overview
Models a worker process.
Constant Summary collapse
- DEFAULT_WORKER_FACTORY =
->(queues) { resque_worker = Resque::Worker.new(*queues) resque_worker.term_child = true if resque_worker.respond_to?('term_child=') Resque.redis.client.reconnect resque_worker }
Instance Attribute Summary collapse
-
#pid ⇒ Object
readonly
Public: The pid of the worker process.
-
#queues ⇒ Object
readonly
Private.
Instance Method Summary collapse
-
#backing_off_for ⇒ Object
Public: The amount of time we need to wait before starting a new worker.
-
#finished!(process_status) ⇒ Object
Public: The old worker process finished!.
-
#idle? ⇒ Boolean
Public: True if there is no worker process mapped to this object.
-
#initialize(options) ⇒ Worker
constructor
A new instance of Worker.
-
#kill(signal) ⇒ Object
Public: Shut this worker down.
-
#queue_key ⇒ Object
Public: A string that compares if this worker is equivalent to a worker in another Resqued::Listener.
-
#running_here? ⇒ Boolean
Public: True if this worker is running in this process.
-
#try_start ⇒ Object
Public: Start a job, if there’s one waiting in one of my queues.
-
#wait_for(pid) ⇒ Object
Public: Claim this worker for another listener’s worker.
Methods included from Logging
build_logger, close_log, #log, log_file, log_file=, #log_to_stdout?, logger, logging_io, #reopen_logs
Constructor Details
#initialize(options) ⇒ Worker
Returns a new instance of Worker.
18 19 20 21 22 23 24 25 |
# File 'lib/resqued/worker.rb', line 18 def initialize() @queues = .fetch(:queues) @config = .fetch(:config) @interval = [:interval] @backoff = Backoff.new @worker_factory = .fetch(:worker_factory, DEFAULT_WORKER_FACTORY) @pids = [] end |
Instance Attribute Details
#pid ⇒ Object (readonly)
Public: The pid of the worker process.
28 29 30 |
# File 'lib/resqued/worker.rb', line 28 def pid @pid end |
#queues ⇒ Object (readonly)
Private.
31 32 33 |
# File 'lib/resqued/worker.rb', line 31 def queues @queues end |
Instance Method Details
#backing_off_for ⇒ Object
Public: The amount of time we need to wait before starting a new worker.
72 73 74 |
# File 'lib/resqued/worker.rb', line 72 def backing_off_for @pid ? nil : @backoff.how_long? end |
#finished!(process_status) ⇒ Object
Public: The old worker process finished!
57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/resqued/worker.rb', line 57 def finished!(process_status) if process_status.nil? && ! @self_started log :debug, "(#{@pid}/#{@pids.inspect}/self_started=#{@self_started}/killed=#{@killed}) I am no longer blocked." @pid = nil @backoff.died unless @killed elsif ! process_status.nil? && @self_started log :debug, "(#{@pid}/#{@pids.inspect}/self_started=#{@self_started}/killed=#{@killed}) I exited: #{process_status}" @pid = nil @backoff.died unless @killed else log :debug, "(#{@pid}/#{@pids.inspect}/self_started=#{@self_started}/killed=#{@killed}) Reports of my death are highly exaggerated (#{process_status.inspect})" end end |
#idle? ⇒ Boolean
Public: True if there is no worker process mapped to this object.
34 35 36 |
# File 'lib/resqued/worker.rb', line 34 def idle? pid.nil? end |
#kill(signal) ⇒ Object
Public: Shut this worker down.
99 100 101 102 103 104 |
# File 'lib/resqued/worker.rb', line 99 def kill(signal) Process.kill(signal.to_s, pid) if pid && @self_started @killed = true rescue Errno::ESRCH => e log "Can't kill #{pid}: #{e}" end |
#queue_key ⇒ Object
Public: A string that compares if this worker is equivalent to a worker in another Resqued::Listener.
44 45 46 |
# File 'lib/resqued/worker.rb', line 44 def queue_key queues.sort.join(';') end |
#running_here? ⇒ Boolean
Public: True if this worker is running in this process.
39 40 41 |
# File 'lib/resqued/worker.rb', line 39 def running_here? !idle? && @self_started end |
#try_start ⇒ Object
Public: Start a job, if there’s one waiting in one of my queues.
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/resqued/worker.rb', line 77 def try_start return if @backoff.wait? @backoff.started @self_started = true @killed = false if @pid = fork @pids << @pid # still in the listener log "Forked worker #{@pid}" else # In case we get a signal before resque is ready for it. Resqued::Listener::ALL_SIGNALS.each { |signal| trap(signal, 'DEFAULT') } trap(:QUIT) { exit! 0 } # If we get a QUIT during boot, just spin back down. $0 = "STARTING RESQUE FOR #{queues.join(',')}" resque_worker = @worker_factory.call(queues) @config.after_fork(resque_worker) resque_worker.work(@interval || 5) exit 0 end end |
#wait_for(pid) ⇒ Object
Public: Claim this worker for another listener’s worker.
49 50 51 52 53 54 |
# File 'lib/resqued/worker.rb', line 49 def wait_for(pid) raise "Already running #{@pid} (can't wait for #{pid})" if @pid @self_started = false @pids << pid @pid = pid end |