Class: Resque::Delayed::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/resque-delayed/worker.rb

Overview

a worker that harvests delayed jobs and queues them

note: this is a modified version of github.com/defunkt/resque/blob/e01bece0ccfd561909333d51b28813d59777183d/lib/resque/worker.rb with nearly everything stripped out of it

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#to_sObject Also known as: id

The string representation is the same as the id for this worker instance. Can be used with ‘Worker.find`.



90
91
92
# File 'lib/resque-delayed/worker.rb', line 90

def to_s
  @to_s ||= "#{hostname}:#{Process.pid}:resque-delayed"
end

#verboseObject

Whether the worker should log basic info to STDOUT



9
10
11
# File 'lib/resque-delayed/worker.rb', line 9

def verbose
  @verbose
end

#very_verboseObject

Whether the worker should log lots of info to STDOUT



12
13
14
# File 'lib/resque-delayed/worker.rb', line 12

def very_verbose
  @very_verbose
end

Instance Method Details

#hostnameObject

chomp’d hostname of this machine



96
97
98
# File 'lib/resque-delayed/worker.rb', line 96

def hostname
  @hostname ||= `hostname`.chomp
end

#inspectObject



84
85
86
# File 'lib/resque-delayed/worker.rb', line 84

def inspect
  "#<Resque::Delayed worker #{to_s}>"
end

#log(message) ⇒ Object

Log a message to STDOUT if we are verbose or very_verbose.



106
107
108
109
110
111
112
113
# File 'lib/resque-delayed/worker.rb', line 106

def log(message)
  if verbose
    puts "*** #{message}"
  elsif very_verbose
    time = Time.now.strftime('%H:%M:%S %Y-%m-%d')
    puts "** [#{time}] #$$: #{message}"
  end
end

#log!(message) ⇒ Object

Logs a very verbose message to STDOUT.



116
117
118
# File 'lib/resque-delayed/worker.rb', line 116

def log!(message)
  log message if very_verbose
end

#pidObject

Returns Integer PID of running worker



101
102
103
# File 'lib/resque-delayed/worker.rb', line 101

def pid
  @pid ||= Process.pid
end

#register_signal_handlersObject

Registers the various signal handlers a worker responds to.

TERM: Shutdown immediately, stop processing jobs.

INT: Shutdown immediately, stop processing jobs.

QUIT: Shutdown after the current job has finished processing.



54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/resque-delayed/worker.rb', line 54

def register_signal_handlers
  trap('TERM') { shutdown! }
  trap('INT')  { shutdown! }

  begin
    trap('QUIT') { shutdown }
  rescue ArgumentError
    warn "Signals TERM and/or QUIT not supported."
  end

  log! "Registered signals"
end

#shutdownObject

Schedule this worker for shutdown. Will finish processing the current job.



69
70
71
72
# File 'lib/resque-delayed/worker.rb', line 69

def shutdown
  log 'Exiting...'
  @shutdown = true
end

#shutdown!Object

Kill the child and shutdown immediately.



75
76
77
# File 'lib/resque-delayed/worker.rb', line 75

def shutdown!
  shutdown
end

#shutdown?Boolean

Should this worker shutdown as soon as current job is finished?

Returns:

  • (Boolean)


80
81
82
# File 'lib/resque-delayed/worker.rb', line 80

def shutdown?
  @shutdown
end

#startupObject

Runs all the methods needed when a worker begins its lifecycle.



41
42
43
44
45
46
47
# File 'lib/resque-delayed/worker.rb', line 41

def startup
  register_signal_handlers

  # Fix buffering so we can `rake resque-delayed:work > resque-delayed.log` and
  # get output from the worker
  $stdout.sync = true
end

#work(interval = 5.0) ⇒ Object

Can be passed a float representing the polling frequency. The default is 5 seconds, but for a semi-active site you may want to use a smaller value.



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/resque-delayed/worker.rb', line 19

def work(interval = 5.0)
  interval = Float(interval)
  $0 = "resque-delayed: harvesting"
  startup

  loop do
    break if shutdown?

    # harvest delayed jobs while they are available
    while job = Resque::Delayed.next do
      log "got: #{job.inspect}"
      queue, klass, *args = job
      Resque::Job.create(queue, klass, *args)
    end

    break if interval.zero?
    log! "Sleeping for #{interval} seconds"
    sleep interval
  end
end