Module: QueueKit::Worker
Instance Method Summary
collapse
#debug, #default_instrumenter, #enable_debug_mode, #force_debug, #instrument, #instrumenter, #instrumenter_from
Instance Method Details
#cool ⇒ Object
19
20
|
# File 'lib/queue_kit/worker.rb', line 19
def cool
end
|
#default_instrument_options ⇒ Object
94
95
96
|
# File 'lib/queue_kit/worker.rb', line 94
def default_instrument_options
{:worker => self}
end
|
#handle_error(err) ⇒ Object
22
23
24
|
# File 'lib/queue_kit/worker.rb', line 22
def handle_error(err)
raise err
end
|
#initialize(queue, options = {}) ⇒ Object
5
6
7
8
9
10
11
12
13
|
# File 'lib/queue_kit/worker.rb', line 5
def initialize(queue, options = {})
@queue = queue
@processor = options.fetch(:processor) { method(:process) }
@cooler = options.fetch(:cooler) { method(:cool) }
@error_handler = options.fetch(:error_handler) { method(:handle_error) }
@stopped = true
instrumenter_from(options)
end
|
#name ⇒ Object
66
67
68
|
# File 'lib/queue_kit/worker.rb', line 66
def name
@name ||= "#{self.class} #{Socket.gethostname}:#{Process.pid}"
end
|
#process(item) ⇒ Object
15
16
17
|
# File 'lib/queue_kit/worker.rb', line 15
def process(item)
raise NotImplementedError, "This worker can't do anything with #{item.inspect}"
end
|
#procline(string) ⇒ Object
41
42
43
44
|
# File 'lib/queue_kit/worker.rb', line 41
def procline(string)
$0 = "QueueKit-#{QueueKit::VERSION}: #{string}"
debug { ["worker.procline", {:message => string}] }
end
|
#run ⇒ Object
30
31
32
33
34
35
36
37
38
39
|
# File 'lib/queue_kit/worker.rb', line 30
def run
start
interval_debugger = lambda { "worker.interval" }
loop do
work
break unless working?
debug(&interval_debugger)
end
end
|
#set_popping_procline ⇒ Object
89
90
91
92
|
# File 'lib/queue_kit/worker.rb', line 89
def set_popping_procline
@last_job_at = Time.now
procline("Waiting since #{@last_job_at.to_i}")
end
|
#set_working_procline ⇒ Object
85
86
87
|
# File 'lib/queue_kit/worker.rb', line 85
def set_working_procline
procline("Processing since #{Time.now.to_i}")
end
|
#start ⇒ Object
70
71
72
73
74
|
# File 'lib/queue_kit/worker.rb', line 70
def start
instrument "worker.start"
set_popping_procline
@stopped = false
end
|
#stop ⇒ Object
76
77
78
79
|
# File 'lib/queue_kit/worker.rb', line 76
def stop
instrument "worker.stop"
@stopped = true
end
|
#trap_signals(signal_handler) ⇒ Object
26
27
28
|
# File 'lib/queue_kit/worker.rb', line 26
def trap_signals(signal_handler)
SignalChecker.trap(self, signal_handler)
end
|
#work ⇒ Object
46
47
48
|
# File 'lib/queue_kit/worker.rb', line 46
def work
wrap_error { work! }
end
|
#work! ⇒ Object
50
51
52
53
54
55
56
57
58
|
# File 'lib/queue_kit/worker.rb', line 50
def work!
if item = @queue.pop
set_working_procline
@processor.call(item)
set_popping_procline
else
@cooler.call if working?
end
end
|
#working? ⇒ Boolean
81
82
83
|
# File 'lib/queue_kit/worker.rb', line 81
def working?
!@stopped
end
|
#wrap_error ⇒ Object
60
61
62
63
64
|
# File 'lib/queue_kit/worker.rb', line 60
def wrap_error
yield
rescue Exception => exception
@error_handler.call(exception)
end
|