Module: Sneakers::Worker

Defined Under Namespace

Modules: ClassMethods

Constant Summary collapse

Classes =
[]

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from ErrorReporter

#worker_error

Instance Attribute Details

#idObject (readonly)

Returns the value of attribute id.



6
7
8
# File 'lib/sneakers/worker.rb', line 6

def id
  @id
end

#optsObject (readonly)

Returns the value of attribute opts.



6
7
8
# File 'lib/sneakers/worker.rb', line 6

def opts
  @opts
end

#queueObject (readonly)

Returns the value of attribute queue.



6
7
8
# File 'lib/sneakers/worker.rb', line 6

def queue
  @queue
end

Class Method Details

.included(base) ⇒ Object



132
133
134
135
# File 'lib/sneakers/worker.rb', line 132

def self.included(base)
  base.extend ClassMethods
  Classes << base if base.is_a? Class
end

Instance Method Details

#ack!Object



34
# File 'lib/sneakers/worker.rb', line 34

def ack!; :ack end

#do_work(delivery_info, metadata, msg, handler) ⇒ Object



47
48
49
50
51
52
53
# File 'lib/sneakers/worker.rb', line 47

def do_work(delivery_info, , msg, handler)
  worker_trace "Working off: #{msg.inspect}"

  @pool.post do
    process_work(delivery_info, , msg, handler)
  end
end

#initialize(queue = nil, pool = nil, opts = {}) ⇒ Object



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/sneakers/worker.rb', line 14

def initialize(queue = nil, pool = nil, opts = {})
  opts = opts.merge(self.class.queue_opts || {})
  queue_name = self.class.queue_name
  opts = Sneakers::CONFIG.merge(opts)

  @should_ack =  opts[:ack]
  @pool = pool || Concurrent::FixedThreadPool.new(opts[:threads] || Sneakers::Configuration::DEFAULTS[:threads])
  @call_with_params = respond_to?(:work_with_params)
  @content_type = opts[:content_type]
  @content_encoding = opts[:content_encoding]

  @queue = queue || Sneakers::Queue.new(
    queue_name,
    opts
  )

  @opts = opts
  @id = Utils.make_worker_id(queue_name)
end

#log_msg(msg) ⇒ Object

Construct a log message with some standard prefix for this worker



122
123
124
# File 'lib/sneakers/worker.rb', line 122

def log_msg(msg)
  "[#{@id}][#{Thread.current}][#{@queue.name}][#{@queue.opts}] #{msg}"
end

#process_work(delivery_info, metadata, msg, handler) ⇒ Object



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/sneakers/worker.rb', line 55

def process_work(delivery_info, , msg, handler)
  res = nil
  error = nil

  begin
    metrics.increment("work.#{self.class.name}.started")
    metrics.timing("work.#{self.class.name}.time") do
      decoded_msg = ContentEncoding.decode(msg, @content_encoding ||  && [:content_encoding])
      deserialized_msg = ContentType.deserialize(decoded_msg, @content_type ||  && [:content_type])

      app = -> (deserialized_msg, delivery_info, , handler) do
        if @call_with_params
          work_with_params(deserialized_msg, delivery_info, )
        else
          work(deserialized_msg)
        end
      end

      middlewares = Sneakers.middleware.to_a
      block_to_call = middlewares.reverse.reduce(app) do |mem, h|
        h[:class].new(mem, *h[:args])
      end
      res = block_to_call.call(deserialized_msg, delivery_info, , handler)
    end
  rescue SignalException, SystemExit
    # ServerEngine handles these exceptions, so they are not expected to be raised within the worker.
    # Nevertheless, they are listed here to ensure that they are not caught by the rescue block below.
    raise
  rescue Exception => ex
    res = :error
    error = ex
    worker_error(ex, log_msg: log_msg(msg), class: self.class.name,
                 message: msg, delivery_info: delivery_info, metadata: )
  ensure
    if @should_ack
      case res
      # note to future-self. never acknowledge multiple (multiple=true) messages under threads.
      when :ack then handler.acknowledge(delivery_info, , msg)
      when :error then handler.error(delivery_info, , msg, error)
      when :reject then handler.reject(delivery_info, , msg)
      when :requeue then handler.reject(delivery_info, , msg, true)
      else
        handler.noop(delivery_info, , msg)
      end
      metrics.increment("work.#{self.class.name}.handled.#{res || 'noop'}")
    end

    metrics.increment("work.#{self.class.name}.ended")
  end
end

#publish(msg, opts) ⇒ Object



38
39
40
41
42
43
44
45
# File 'lib/sneakers/worker.rb', line 38

def publish(msg, opts)
  to_queue = opts.delete(:to_queue)
  opts[:routing_key] ||= to_queue
  return unless opts[:routing_key]
  serialized_msg = Sneakers::ContentType.serialize(msg, opts[:content_type])
  encoded_msg = Sneakers::ContentEncoding.encode(serialized_msg, opts[:content_encoding])
  @queue.exchange.publish(encoded_msg, **opts)
end

#reject!Object



35
# File 'lib/sneakers/worker.rb', line 35

def reject!; :reject; end

#requeue!Object



36
# File 'lib/sneakers/worker.rb', line 36

def requeue!; :requeue; end

#runObject



115
116
117
118
119
# File 'lib/sneakers/worker.rb', line 115

def run
  worker_trace "New worker: subscribing."
  @queue.subscribe(self)
  worker_trace "New worker: I'm alive."
end

#stopObject



106
107
108
109
110
111
112
113
# File 'lib/sneakers/worker.rb', line 106

def stop
  worker_trace "Stopping worker: unsubscribing."
  @queue.unsubscribe
  worker_trace "Stopping worker: shutting down thread pool."
  @pool.shutdown
  @pool.wait_for_termination
  worker_trace "Stopping worker: I'm gone."
end

#worker_trace(msg) ⇒ Object



126
127
128
# File 'lib/sneakers/worker.rb', line 126

def worker_trace(msg)
  logger.debug(log_msg(msg))
end