Class: Resqued::Worker

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/resqued/worker.rb

Overview

Models a worker process.

Constant Summary collapse

DEFAULT_WORKER_FACTORY =
->(queues) {
  resque_worker = Resque::Worker.new(*queues)
  resque_worker.term_child = true if resque_worker.respond_to?('term_child=')
  Resque.redis.client.reconnect
  resque_worker
}

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logging

build_logger, close_log, #log, log_file, log_file=, #log_to_stdout?, logger, logging_io, #reopen_logs

Constructor Details

#initialize(options) ⇒ Worker

Returns a new instance of Worker.



18
19
20
21
22
23
24
25
# File 'lib/resqued/worker.rb', line 18

def initialize(options)
  @queues = options.fetch(:queues)
  @config = options.fetch(:config)
  @interval = options[:interval]
  @backoff = Backoff.new
  @worker_factory = options.fetch(:worker_factory, DEFAULT_WORKER_FACTORY)
  @pids = []
end

Instance Attribute Details

#pidObject (readonly)

Public: The pid of the worker process.



28
29
30
# File 'lib/resqued/worker.rb', line 28

def pid
  @pid
end

#queuesObject (readonly)

Private.



31
32
33
# File 'lib/resqued/worker.rb', line 31

def queues
  @queues
end

Instance Method Details

#backing_off_forObject

Public: The amount of time we need to wait before starting a new worker.



72
73
74
# File 'lib/resqued/worker.rb', line 72

def backing_off_for
  @pid ? nil : @backoff.how_long?
end

#finished!(process_status) ⇒ Object

Public: The old worker process finished!



57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/resqued/worker.rb', line 57

def finished!(process_status)
  if process_status.nil? && ! @self_started
    log :debug, "(#{@pid}/#{@pids.inspect}/self_started=#{@self_started}/killed=#{@killed}) I am no longer blocked."
    @pid = nil
    @backoff.died unless @killed
  elsif ! process_status.nil? && @self_started
    log :debug, "(#{@pid}/#{@pids.inspect}/self_started=#{@self_started}/killed=#{@killed}) I exited: #{process_status}"
    @pid = nil
    @backoff.died unless @killed
  else
    log :debug, "(#{@pid}/#{@pids.inspect}/self_started=#{@self_started}/killed=#{@killed}) Reports of my death are highly exaggerated (#{process_status.inspect})"
  end
end

#idle?Boolean

Public: True if there is no worker process mapped to this object.

Returns:

  • (Boolean)


34
35
36
# File 'lib/resqued/worker.rb', line 34

def idle?
  pid.nil?
end

#kill(signal) ⇒ Object

Public: Shut this worker down.



99
100
101
102
103
104
# File 'lib/resqued/worker.rb', line 99

def kill(signal)
  Process.kill(signal.to_s, pid) if pid && @self_started
  @killed = true
rescue Errno::ESRCH => e
  log "Can't kill #{pid}: #{e}"
end

#queue_keyObject

Public: A string that compares if this worker is equivalent to a worker in another Resqued::Listener.



44
45
46
# File 'lib/resqued/worker.rb', line 44

def queue_key
  queues.sort.join(';')
end

#running_here?Boolean

Public: True if this worker is running in this process.

Returns:

  • (Boolean)


39
40
41
# File 'lib/resqued/worker.rb', line 39

def running_here?
  !idle? && @self_started
end

#try_startObject

Public: Start a job, if there’s one waiting in one of my queues.



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/resqued/worker.rb', line 77

def try_start
  return if @backoff.wait?
  @backoff.started
  @self_started = true
  @killed = false
  if @pid = fork
    @pids << @pid
    # still in the listener
    log "Forked worker #{@pid}"
  else
    # In case we get a signal before resque is ready for it.
    Resqued::Listener::ALL_SIGNALS.each { |signal| trap(signal, 'DEFAULT') }
    trap(:QUIT) { exit! 0 } # If we get a QUIT during boot, just spin back down.
    $0 = "STARTING RESQUE FOR #{queues.join(',')}"
    resque_worker = @worker_factory.call(queues)
    @config.after_fork(resque_worker)
    resque_worker.work(@interval || 5)
    exit 0
  end
end

#wait_for(pid) ⇒ Object

Public: Claim this worker for another listener’s worker.



49
50
51
52
53
54
# File 'lib/resqued/worker.rb', line 49

def wait_for(pid)
  raise "Already running #{@pid} (can't wait for #{pid})" if @pid
  @self_started = false
  @pids << pid
  @pid = pid
end