Class: Qu::Worker
- Inherits:
-
Object
show all
- Includes:
- Logger
- Defined in:
- lib/qu/worker.rb
Defined Under Namespace
Classes: Abort
Instance Attribute Summary collapse
Instance Method Summary
collapse
Methods included from Logger
#clean_backtrace, #log_exception, #logger
Constructor Details
#initialize(*queues) ⇒ Worker
Returns a new instance of Worker.
10
11
12
13
14
|
# File 'lib/qu/worker.rb', line 10
def initialize(*queues)
@queues = queues.flatten
self.attributes = @queues.pop if @queues.last.is_a?(Hash)
@queues << 'default' if @queues.empty?
end
|
Instance Attribute Details
#queues ⇒ Object
Returns the value of attribute queues.
5
6
7
|
# File 'lib/qu/worker.rb', line 5
def queues
@queues
end
|
Instance Method Details
#attributes ⇒ Object
22
23
24
|
# File 'lib/qu/worker.rb', line 22
def attributes
{'hostname' => hostname, 'pid' => pid, 'queues' => queues}
end
|
#attributes=(attrs) ⇒ Object
16
17
18
19
20
|
# File 'lib/qu/worker.rb', line 16
def attributes=(attrs)
attrs.each do |attr, value|
self.instance_variable_set("@#{attr}", value)
end
end
|
#handle_signals ⇒ Object
26
27
28
29
30
31
32
33
34
|
# File 'lib/qu/worker.rb', line 26
def handle_signals
logger.debug "Worker #{id} registering traps for INT and TERM signals"
%W(INT TERM).each do |sig|
trap(sig) do
logger.info "Worker #{id} received #{sig}, shutting down"
raise Abort
end
end
end
|
#hostname ⇒ Object
73
74
75
|
# File 'lib/qu/worker.rb', line 73
def hostname
@hostname ||= `hostname`.strip
end
|
#id ⇒ Object
65
66
67
|
# File 'lib/qu/worker.rb', line 65
def id
@id ||= "#{hostname}:#{pid}:#{queues.join(',')}"
end
|
#pid ⇒ Object
69
70
71
|
# File 'lib/qu/worker.rb', line 69
def pid
@pid ||= Process.pid
end
|
#start ⇒ Object
53
54
55
56
57
58
59
60
61
62
63
|
# File 'lib/qu/worker.rb', line 53
def start
logger.warn "Worker #{id} starting"
handle_signals
Qu.backend.register_worker(self)
loop { work }
rescue Abort => e
ensure
Qu.backend.unregister_worker(self)
logger.debug "Worker #{id} done"
end
|
#work ⇒ Object
45
46
47
48
49
50
51
|
# File 'lib/qu/worker.rb', line 45
def work
logger.debug "Worker #{id} waiting for next job"
job = Qu.reserve(self)
logger.debug "Worker #{id} reserved job #{job}"
job.perform
logger.debug "Worker #{id} completed job #{job}"
end
|
#work_off ⇒ Object
36
37
38
39
40
41
42
43
|
# File 'lib/qu/worker.rb', line 36
def work_off
logger.debug "Worker #{id} working of all jobs"
while job = Qu.reserve(self, :block => false)
logger.debug "Worker #{id} reserved job #{job}"
job.perform
logger.debug "Worker #{id} completed job #{job}"
end
end
|