Module: QueueKit::Worker
- Defined in:
- lib/queue_kit/worker.rb
Defined Under Namespace
Classes: PutsInstrumenter
Instance Method Summary
collapse
Instance Method Details
#cool ⇒ Object
22
23
|
# File 'lib/queue_kit/worker.rb', line 22
def cool
end
|
#debug ⇒ Object
105
106
|
# File 'lib/queue_kit/worker.rb', line 105
def debug
end
|
#force_debug ⇒ Object
101
102
103
|
# File 'lib/queue_kit/worker.rb', line 101
def force_debug
instrument(*yield)
end
|
#handle_error(err) ⇒ Object
25
26
27
|
# File 'lib/queue_kit/worker.rb', line 25
def handle_error(err)
raise err
end
|
#initialize(queue, options = {}) ⇒ Object
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
# File 'lib/queue_kit/worker.rb', line 3
def initialize(queue, options = {})
@queue = queue
@processor = options.fetch(:processor) { method(:process) }
@cooler = options.fetch(:cooler) { method(:cool) }
@error_handler = options.fetch(:error_handler) { method(:handle_error) }
@instrumenter = options.fetch(:instrumenter) { PutsInstrumenter.new }
@stopped = true
if options.fetch(:debug) { false }
class << self
alias debug force_debug
end
end
end
|
#instrument(name, payload = nil) ⇒ Object
87
88
89
90
|
# File 'lib/queue_kit/worker.rb', line 87
def instrument(name, payload = nil)
(payload ||= {}).update(:worker => self)
@instrumenter.instrument("queuekit.#{name}", payload)
end
|
#name ⇒ Object
68
69
70
|
# File 'lib/queue_kit/worker.rb', line 68
def name
@name ||= "#{self.class} #{Socket.gethostname}:#{Process.pid}"
end
|
#process(item) ⇒ Object
18
19
20
|
# File 'lib/queue_kit/worker.rb', line 18
def process(item)
raise NotImplementedError, "This worker can't do anything with #{item.inspect}"
end
|
#procline(string) ⇒ Object
43
44
45
46
|
# File 'lib/queue_kit/worker.rb', line 43
def procline(string)
$0 = "QueueKit-#{QueueKit::VERSION}: #{string}"
debug { ["worker.procline", {:message => string}] }
end
|
#run ⇒ Object
33
34
35
36
37
38
39
40
41
|
# File 'lib/queue_kit/worker.rb', line 33
def run
start
interval_debugger = lambda { "worker.interval" }
loop do
working? ? work : break
debug(&interval_debugger)
end
end
|
#set_popping_procline ⇒ Object
96
97
98
99
|
# File 'lib/queue_kit/worker.rb', line 96
def set_popping_procline
@last_job_at = Time.now
procline("Waiting since #{@last_job_at.to_i}")
end
|
#set_working_procline ⇒ Object
92
93
94
|
# File 'lib/queue_kit/worker.rb', line 92
def set_working_procline
procline("Processing since #{Time.now.to_i}")
end
|
#start ⇒ Object
72
73
74
75
76
|
# File 'lib/queue_kit/worker.rb', line 72
def start
instrument "worker.start"
set_popping_procline
@stopped = false
end
|
#stop ⇒ Object
78
79
80
81
|
# File 'lib/queue_kit/worker.rb', line 78
def stop
instrument "worker.stop"
@stopped = true
end
|
#trap(signal_handler) ⇒ Object
29
30
31
|
# File 'lib/queue_kit/worker.rb', line 29
def trap(signal_handler)
SignalChecker.trap(self, signal_handler)
end
|
#work ⇒ Object
48
49
50
|
# File 'lib/queue_kit/worker.rb', line 48
def work
wrap_error { work! }
end
|
#work! ⇒ Object
52
53
54
55
56
57
58
59
60
|
# File 'lib/queue_kit/worker.rb', line 52
def work!
if item = @queue.pop
set_working_procline
@processor.call(item)
set_popping_procline
else
@cooler.call
end
end
|
#working? ⇒ Boolean
83
84
85
|
# File 'lib/queue_kit/worker.rb', line 83
def working?
!@stopped
end
|
#wrap_error ⇒ Object
62
63
64
65
66
|
# File 'lib/queue_kit/worker.rb', line 62
def wrap_error
yield
rescue Exception => exception
@error_handler.call(exception)
end
|