Class: Delayed::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/delayed/worker.rb

Constant Summary collapse

@@sleep_delay =
5

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Worker

Returns a new instance of Worker.



36
37
38
39
40
41
42
43
44
45
# File 'lib/delayed/worker.rb', line 36

def initialize(options={})
  @quiet = options[:quiet]
  Delayed::Job.min_priority = options[:min_priority] if options.has_key?(:min_priority)
  Delayed::Job.max_priority = options[:max_priority] if options.has_key?(:max_priority)
  self.worker_count = options[:worker_count] || 1

  @ready_queue = SizedQueue.new(1)
  @work_queue = Queue.new
  @results_queue = Queue.new
end

Instance Attribute Details

#name_prefixObject

name_prefix is ignored if name is set directly



15
16
17
# File 'lib/delayed/worker.rb', line 15

def name_prefix
  @name_prefix
end

#worker_countObject

Returns the value of attribute worker_count.



16
17
18
# File 'lib/delayed/worker.rb', line 16

def worker_count
  @worker_count
end

Instance Method Details

#job_max_run_timeObject



18
19
20
# File 'lib/delayed/worker.rb', line 18

def job_max_run_time
  Delayed::Job.max_run_time
end

#nameObject

Every worker has a unique name which by default is the pid of the process. There are some advantages to overriding this with something which survives worker retarts: Workers can safely resume working on tasks which are locked by themselves. The worker will assume that it crashed before.



25
26
27
28
# File 'lib/delayed/worker.rb', line 25

def name
  return @name unless @name.nil?
  "#{@name_prefix}host:#{Socket.gethostname} pid:#{Process.pid}" rescue "#{@name_prefix}pid:#{Process.pid}"
end

#name=(val) ⇒ Object

Sets the name of the worker. Setting the name to nil will reset the default worker name



32
33
34
# File 'lib/delayed/worker.rb', line 32

def name=(val)
  @name = val
end

#say(text, level = Logger::INFO) ⇒ Object



96
97
98
99
# File 'lib/delayed/worker.rb', line 96

def say(text, level = Logger::INFO)
  puts text unless @quiet
  logger.add level, text if logger
end

#startObject



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/delayed/worker.rb', line 47

def start
  say "*** Starting job worker #{name}"

  trap('TERM') { say 'Exiting...'; $exit = true }
  trap('INT')  { say 'Exiting...'; $exit = true }

  consumers = []
  1.upto(worker_count) {|i| consumers << start_consumer(i) }

  monitor = Thread.new { monitor_consumers(consumers) }

  loop do
    result = nil

    realtime = Benchmark.realtime do
      result = work_off
    end

    count = result.sum

    break if $exit

    if count.zero?
      sleep(@@sleep_delay)
    else
      say "#{count} jobs processed at %.4f j/s, %d failed ..." % [count / realtime, result.last]
    end

    break if $exit
  end

  worker_count.times {|i| @work_queue << nil }
  @ready_queue.pop while !@ready_queue.empty?
  monitor.run if monitor.status
  monitor.value
  consumers.each do |c|
    begin
      c.value
    rescue Exception => e
      say e, Logger::ERROR
    end
  end

ensure
  1.upto(worker_count) do |i|
    Delayed::Job.clear_locks!("#{name} #{i}")
  end
end