Class: Resque::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/resque_ui/overrides/resque/worker.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.start(ips, queues) ⇒ Object



234
235
236
237
238
239
240
# File 'lib/resque_ui/overrides/resque/worker.rb', line 234

def self.start(ips, queues)
  if Rails.env =~ /development|test/
    Thread.new(queues) { |queue| system("rake RAILS_ENV=#{Rails.env} QUEUE=#{queue} resque:work") }
  else
    Thread.new(queues, ips) { |queue, ip_list| system("cd #{Rails.root}; #{ResqueUi::Cap.path} #{Rails.env} resque:work host=#{ip_list} queue=#{queue}") }
  end
end

.workingObject

logic for mappged_mget changed where it returns keys with nil values in latest redis gem.



216
217
218
219
220
221
222
223
# File 'lib/resque_ui/overrides/resque/worker.rb', line 216

def self.working
  names = all
  return [] unless names.any?
  names.map! { |name| "worker:#{name}" }
  redis.mapped_mget(*names).map do |key, value|
    find key.sub("worker:", '') unless value.nil?
  end.compact
end

Instance Method Details

#all_workers_in_pid_workingObject



153
154
155
# File 'lib/resque_ui/overrides/resque/worker.rb', line 153

def all_workers_in_pid_working
  workers_in_pid.select { |w| (hash = w.processing) && !hash.empty? }
end

#continueObject



265
266
267
268
269
270
271
# File 'lib/resque_ui/overrides/resque/worker.rb', line 265

def continue
  if Rails.env =~ /development|test/
    system("kill -CONT  #{self.pid}")
  else
    system("cd #{Rails.root}; #{ResqueUi::Cap.path} #{Rails.env} resque:continue_worker pid=#{self.pid} host=#{self.ip}")
  end
end

#ipObject



43
44
45
# File 'lib/resque_ui/overrides/resque/worker.rb', line 43

def ip
  to_s.split(':').first[/\b(?:\d{1,3}\.){3}\d{1,3}\b/]
end

#local_ipObject



6
7
8
9
10
11
12
13
14
15
# File 'lib/resque_ui/overrides/resque/worker.rb', line 6

def local_ip
  orig, Socket.do_not_reverse_lookup = Socket.do_not_reverse_lookup, true # turn off reverse DNS resolution temporarily

  UDPSocket.open do |s|
    s.connect '64.233.187.99', 1
    s.addr.last
  end
ensure
  Socket.do_not_reverse_lookup = orig
end

#pauseObject



257
258
259
260
261
262
263
# File 'lib/resque_ui/overrides/resque/worker.rb', line 257

def pause
  if Rails.env =~ /development|test/
    system("kill -USR2  #{self.pid}")
  else
    system("cd #{Rails.root}; #{ResqueUi::Cap.path} #{Rails.env} resque:pause_worker pid=#{self.pid} host=#{self.ip}")
  end
end

#pause_keyObject

When the worker gets the -USR2 signal, to_s may give a different value for the thread and queue portion



26
27
28
29
# File 'lib/resque_ui/overrides/resque/worker.rb', line 26

def pause_key
  key = to_s.split(':')
  "worker:#{key.first}:#{key.second}:all_workers:paused"
end

#pause_processingObject

Stop processing jobs after the current one has completed (if we’re currently running one). OVERRIDE to set a redis key so UI knows it’s paused too Would prefer to call super but get no superclass method error



95
96
97
98
99
# File 'lib/resque_ui/overrides/resque/worker.rb', line 95

def pause_processing
  log "USR2 received; pausing job processing"
  @paused = true
  redis.set(pause_key, Time.now.to_s)
end

#pausedObject



81
82
83
# File 'lib/resque_ui/overrides/resque/worker.rb', line 81

def paused
  redis.get pause_key
end

#paused?Boolean

are we paused? OVERRIDE so UI can tell if we’re paused

Returns:

  • (Boolean)


87
88
89
# File 'lib/resque_ui/overrides/resque/worker.rb', line 87

def paused?
  @paused || paused.present?
end

#prune_dead_workersObject

Looks for any workers which should be running on this server and, if they’re not, removes them from Redis.

This is a form of garbage collection. If a server is killed by a hard shutdown, power failure, or something else beyond our control, the Resque workers will not die gracefully and therefor will leave stale state information in Redis.

By checking the current Redis state against the actual environment, we can determine if Redis is old and clean it up a bit.



120
121
122
123
124
125
126
127
128
# File 'lib/resque_ui/overrides/resque/worker.rb', line 120

def prune_dead_workers
  Worker.all.each do |worker|
    host, pid, thread, queues = worker.id.split(':')
    next unless host.include?(hostname)
    next if worker_pids.include?(pid)
    log! "Pruning dead worker: #{worker}"
    worker.unregister_worker
  end
end

#queueObject



35
36
37
# File 'lib/resque_ui/overrides/resque/worker.rb', line 35

def queue
  to_s.split(':').last
end

#queuesObject

OVERRIDE for multithreaded workers



52
53
54
# File 'lib/resque_ui/overrides/resque/worker.rb', line 52

def queues
  Thread.current[:queues] == "*" ? Resque.queues.sort : Thread.current[:queues].split(',')
end

#queues_in_pidObject



47
48
49
# File 'lib/resque_ui/overrides/resque/worker.rb', line 47

def queues_in_pid
  workers_in_pid.collect { |w| w.queue }
end

#quitObject



242
243
244
245
246
247
248
249
250
251
252
253
254
255
# File 'lib/resque_ui/overrides/resque/worker.rb', line 242

def quit
  if Rails.env =~ /development|test/
    if RUBY_PLATFORM =~ /java/
      #jruby doesn't trap the -QUIT signal
      #-TERM gracefully kills the main pid and does a -9 on the child if there is one.
      #Since jruby doesn't fork a child, the main worker is gracefully killed.
      system("kill -TERM  #{self.pid}")
    else
      system("kill -QUIT  #{self.pid}")
    end
  else
    system("cd #{Rails.root}; #{ResqueUi::Cap.path} #{Rails.env} resque:quit_worker pid=#{self.pid} host=#{self.ip}")
  end
end

#restartObject



273
274
275
276
277
# File 'lib/resque_ui/overrides/resque/worker.rb', line 273

def restart
  queues = self.queues_in_pid.join('#')
  quit
  self.class.start(self.ip, queues)
end

#shutdownObject

Schedule this worker for shutdown. Will finish processing the current job. OVERRIDE for multithreaded workers



75
76
77
78
79
# File 'lib/resque_ui/overrides/resque/worker.rb', line 75

def shutdown
  log 'Exiting...'
  Thread.list.each { |t| t[:shutdown] = true }
  @shutdown = true
end

#startupObject

Runs all the methods needed when a worker begins its lifecycle. OVERRIDE for multithreaded workers



58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/resque_ui/overrides/resque/worker.rb', line 58

def startup
  enable_gc_optimizations
  if Thread.current == Thread.main
    register_signal_handlers
    prune_dead_workers
  end
  run_hook :before_first_fork
  register_worker

  # Fix buffering so we can `rake resque:work > resque.log` and
  # get output from the child in there.
  $stdout.sync = true
end

#statusObject



230
231
232
# File 'lib/resque_ui/overrides/resque/worker.rb', line 230

def status
  job['status']
end

#status=(status) ⇒ Object



225
226
227
228
# File 'lib/resque_ui/overrides/resque/worker.rb', line 225

def status=(status)
  data = encode(job.merge('status' => status))
  redis.set("worker:#{self}", data)
end

#threadObject



31
32
33
# File 'lib/resque_ui/overrides/resque/worker.rb', line 31

def thread
  to_s.split(':').third
end

#to_sObject Also known as: id

The string representation is the same as the id for this worker instance. Can be used with ‘Worker.find`.



19
20
21
# File 'lib/resque_ui/overrides/resque/worker.rb', line 19

def to_s
  @to_s || "#{hostname}(#{local_ip}):#{Process.pid}:#{Thread.current.object_id}:#{Thread.current[:queues]}"
end

#unpause_processingObject

Start processing jobs again after a pause OVERRIDE to set remove redis key so UI knows it’s unpaused too Would prefer to call super but get no superclass method error



104
105
106
107
108
# File 'lib/resque_ui/overrides/resque/worker.rb', line 104

def unpause_processing
  log "CONT received; resuming job processing"
  @paused = false
  redis.del(pause_key)
end

#unregister_workerObject

Unregisters ourself as a worker. Useful when shutting down. OVERRIDE to also remove the pause key Would prefer to call super but get no superclass method error



133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/resque_ui/overrides/resque/worker.rb', line 133

def unregister_worker
  # If we're still processing a job, make sure it gets logged as a
  # failure.
  if (hash = processing) && !hash.empty?
    job = Job.new(hash['queue'], hash['payload'])
    # Ensure the proper worker is attached to this job, even if
    # it's not the precise instance that died.
    job.worker = self
    job.fail(DirtyExit.new)
  end

  redis.srem(:workers, self)
  redis.del("worker:#{self}")
  redis.del("worker:#{self}:started")
  redis.del(pause_key)

  Stat.clear("processed:#{self}")
  Stat.clear("failed:#{self}")
end

#work(interval = 5, &block) ⇒ Object

This is the main workhorse method. Called on a Worker instance, it begins the worker life cycle.

The following events occur during a worker’s life cycle:

  1. Startup: Signals are registered, dead workers are pruned,

    and this worker is registered.
    
  2. Work loop: Jobs are pulled from a queue and processed.

  3. Teardown: This worker is unregistered.

Can be passed an integer representing the polling frequency. The default is 5 seconds, but for a semi-active site you may want to use a smaller value.

Also accepts a block which will be passed the job as soon as it has completed processing. Useful for testing. OVERRIDE for multithreaded workers



174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
# File 'lib/resque_ui/overrides/resque/worker.rb', line 174

def work(interval = 5, &block)
  $0 = "resque: Starting"
  startup

  loop do
    break if @shutdown || Thread.current[:shutdown]

    if not @paused and job = reserve
      log "got: #{job.inspect}"
      run_hook :before_fork
      working_on job

      if @child = fork
        rand # Reseeding
        procline "Forked #{@child} at #{Time.now.to_i}"
        Process.wait
      else
        procline "Processing #{job.queue} since #{Time.now.to_i}"
        perform(job, &block)
        exit! unless @cant_fork
      end

      done_working
      @child = nil
    else
      break if interval.to_i == 0
      log! "Sleeping for #{interval.to_i}"
      procline @paused ? "Paused" : "Waiting for #{@queues.join(',')}"
      sleep interval.to_i
    end
  end
  unregister_worker rescue nil
  loop do
    #hang onto the process until all threads are done
    break if all_workers_in_pid_working.blank?
    sleep interval.to_i
  end
ensure
  unregister_worker
end

#workers_in_pidObject



39
40
41
# File 'lib/resque_ui/overrides/resque/worker.rb', line 39

def workers_in_pid
  Array(redis.smembers(:workers)).select { |id| id =~ /\(#{ip}\):#{pid}/ }.map { |id| Resque::Worker.find(id) }.compact
end