Module: QueueKit::Worker

Defined in:
lib/queue_kit/worker.rb

Defined Under Namespace

Classes: PutsInstrumenter

Instance Method Summary collapse

Instance Method Details

#coolObject



22
23
# File 'lib/queue_kit/worker.rb', line 22

def cool
end

#debugObject



105
106
# File 'lib/queue_kit/worker.rb', line 105

def debug
end

#force_debugObject



101
102
103
# File 'lib/queue_kit/worker.rb', line 101

def force_debug
  instrument(*yield)
end

#handle_error(err) ⇒ Object



25
26
27
# File 'lib/queue_kit/worker.rb', line 25

def handle_error(err)
  raise err
end

#initialize(queue, options = {}) ⇒ Object



3
4
5
6
7
8
9
10
11
12
13
14
15
16
# File 'lib/queue_kit/worker.rb', line 3

def initialize(queue, options = {})
  @queue = queue
  @processor = options.fetch(:processor) { method(:process) }
  @cooler = options.fetch(:cooler) { method(:cool) }
  @error_handler = options.fetch(:error_handler) { method(:handle_error) }
  @instrumenter = options.fetch(:instrumenter) { PutsInstrumenter.new }
  @stopped = true

  if options.fetch(:debug) { false }
    class << self
      alias debug force_debug
    end
  end
end

#instrument(name, payload = nil) ⇒ Object



87
88
89
90
# File 'lib/queue_kit/worker.rb', line 87

def instrument(name, payload = nil)
  (payload ||= {}).update(:worker => self)
  @instrumenter.instrument("queuekit.#{name}", payload)
end

#nameObject



68
69
70
# File 'lib/queue_kit/worker.rb', line 68

def name
  @name ||= "#{self.class} #{Socket.gethostname}:#{Process.pid}"
end

#process(item) ⇒ Object

Raises:

  • (NotImplementedError)


18
19
20
# File 'lib/queue_kit/worker.rb', line 18

def process(item)
  raise NotImplementedError, "This worker can't do anything with #{item.inspect}"
end

#procline(string) ⇒ Object



43
44
45
46
# File 'lib/queue_kit/worker.rb', line 43

def procline(string)
  $0 = "QueueKit-#{QueueKit::VERSION}: #{string}"
  debug { ["worker.procline", {:message => string}] }
end

#runObject



33
34
35
36
37
38
39
40
41
# File 'lib/queue_kit/worker.rb', line 33

def run
  start
  interval_debugger = lambda { "worker.interval" }

  loop do
    working? ? work : break
    debug(&interval_debugger)
  end
end

#set_popping_proclineObject



96
97
98
99
# File 'lib/queue_kit/worker.rb', line 96

def set_popping_procline
  @last_job_at = Time.now
  procline("Waiting since #{@last_job_at.to_i}")
end

#set_working_proclineObject



92
93
94
# File 'lib/queue_kit/worker.rb', line 92

def set_working_procline
  procline("Processing since #{Time.now.to_i}")
end

#startObject



72
73
74
75
76
# File 'lib/queue_kit/worker.rb', line 72

def start
  instrument "worker.start"
  set_popping_procline
  @stopped = false
end

#stopObject



78
79
80
81
# File 'lib/queue_kit/worker.rb', line 78

def stop
  instrument "worker.stop"
  @stopped = true
end

#trap(signal_handler) ⇒ Object



29
30
31
# File 'lib/queue_kit/worker.rb', line 29

def trap(signal_handler)
  SignalChecker.trap(self, signal_handler)
end

#workObject



48
49
50
# File 'lib/queue_kit/worker.rb', line 48

def work
  wrap_error { work! }
end

#work!Object



52
53
54
55
56
57
58
59
60
# File 'lib/queue_kit/worker.rb', line 52

def work!
  if item = @queue.pop
    set_working_procline
    @processor.call(item)
    set_popping_procline
  else
    @cooler.call
  end
end

#working?Boolean

Returns:

  • (Boolean)


83
84
85
# File 'lib/queue_kit/worker.rb', line 83

def working?
  !@stopped
end

#wrap_errorObject



62
63
64
65
66
# File 'lib/queue_kit/worker.rb', line 62

def wrap_error
  yield
rescue Exception => exception
  @error_handler.call(exception)
end