Class: Resqued::Worker

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/resqued/worker.rb

Overview

Models a worker process.

Constant Summary collapse

DEFAULT_WORKER_FACTORY =
lambda { |queues|
  require "resque"
  resque_worker = Resque::Worker.new(*queues)
  resque_worker.term_child = true if resque_worker.respond_to?("term_child=")
  redis_client = Resque.redis.respond_to?(:_client) ? Resque.redis._client : Resque.redis.client
  if redis_client.respond_to?(:reconnect)
    redis_client.reconnect
  else
    redis_client.close
  end
  resque_worker
}

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logging

build_logger, close_log, #log, log_file, log_file=, #log_to_stdout?, logger, logging_io, #reopen_logs

Constructor Details

#initialize(options) ⇒ Worker

Returns a new instance of Worker.



24
25
26
27
28
29
30
31
# File 'lib/resqued/worker.rb', line 24

def initialize(options)
  @queues = options.fetch(:queues)
  @config = options.fetch(:config)
  @interval = options[:interval]
  @backoff = Backoff.new
  @worker_factory = options.fetch(:worker_factory, DEFAULT_WORKER_FACTORY)
  @pids = []
end

Instance Attribute Details

#pidObject (readonly)

Public: The pid of the worker process.



34
35
36
# File 'lib/resqued/worker.rb', line 34

def pid
  @pid
end

#queuesObject (readonly)

Private.



37
38
39
# File 'lib/resqued/worker.rb', line 37

def queues
  @queues
end

Instance Method Details

#backing_off_forObject

Public: The amount of time we need to wait before starting a new worker.



83
84
85
# File 'lib/resqued/worker.rb', line 83

def backing_off_for
  @pid ? nil : @backoff.how_long?
end

#finished!(process_status) ⇒ Object

Public: The old worker process finished!



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/resqued/worker.rb', line 64

def finished!(process_status)
  summary = "(#{@pid}/#{@pids.inspect}/self_started=#{@self_started}/killed=#{@killed})"
  if process_status.nil? && !@self_started
    log :debug, "#{summary} I am no longer blocked."
    @pid = nil
    @backoff.died unless @killed
  elsif !process_status.nil? && @self_started
    alive_time_sec = Process.clock_gettime(Process::CLOCK_MONOTONIC) - @start_time
    @config.after_exit(WorkerSummary.new(alive_time_sec: alive_time_sec, process_status: process_status))

    log :debug, "#{summary} I exited: #{process_status}"
    @pid = nil
    @backoff.died unless @killed
  else
    log :debug, "#{summary} Reports of my death are highly exaggerated (#{process_status.inspect})"
  end
end

#idle?Boolean

Public: True if there is no worker process mapped to this object.

Returns:

  • (Boolean)


40
41
42
# File 'lib/resqued/worker.rb', line 40

def idle?
  pid.nil?
end

#kill(signal) ⇒ Object

Public: Shut this worker down.



117
118
119
120
121
122
# File 'lib/resqued/worker.rb', line 117

def kill(signal)
  Process.kill(signal.to_s, pid) if pid && @self_started
  @killed = true
rescue Errno::ESRCH => e
  log "Can't kill #{pid}: #{e}"
end

#queue_keyObject

Public: A string that compares if this worker is equivalent to a worker in another Resqued::Listener.



50
51
52
# File 'lib/resqued/worker.rb', line 50

def queue_key
  Digest::SHA256.hexdigest(queues.sort.join(";"))
end

#running_here?Boolean

Public: True if this worker is running in this process.

Returns:

  • (Boolean)


45
46
47
# File 'lib/resqued/worker.rb', line 45

def running_here?
  !idle? && @self_started
end

#try_startObject

Public: Start a job, if there’s one waiting in one of my queues.



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/resqued/worker.rb', line 88

def try_start
  return if @backoff.wait?

  @backoff.started
  @self_started = true
  @killed = false
  @start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)

  if @pid = fork
    @pids << @pid
    # still in the listener
    log "Forked worker #{@pid}"
  else
    # In case we get a signal before resque is ready for it.
    Resqued::Listener::ALL_SIGNALS.each { |signal| trap(signal, "DEFAULT") }
    # Continue ignoring SIGHUP, though.
    trap(:HUP) {}
    # If we get a QUIT during boot, just spin back down.
    trap(:QUIT) { exit! 0 }

    $0 = "STARTING RESQUE FOR #{queues.join(',')}"
    resque_worker = @worker_factory.call(queues)
    @config.after_fork(resque_worker)
    resque_worker.work(@interval || 5)
    exit 0
  end
end

#wait_for(pid) ⇒ Object

Public: Claim this worker for another listener’s worker.



55
56
57
58
59
60
61
# File 'lib/resqued/worker.rb', line 55

def wait_for(pid)
  raise "Already running #{@pid} (can't wait for #{pid})" if @pid

  @self_started = false
  @pids << pid
  @pid = pid
end