Module: Nsqcd::Worker

Includes:
Concerns::Logging, ErrorReporter
Defined in:
lib/nsqcd/worker.rb

Defined Under Namespace

Modules: ClassMethods

Constant Summary collapse

Classes =
[]

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from ErrorReporter

#worker_error

Instance Attribute Details

#channelObject (readonly)

Returns the value of attribute channel.



6
7
8
# File 'lib/nsqcd/worker.rb', line 6

def channel
  @channel
end

#idObject (readonly)

Returns the value of attribute id.



6
7
8
# File 'lib/nsqcd/worker.rb', line 6

def id
  @id
end

#optsObject (readonly)

Returns the value of attribute opts.



6
7
8
# File 'lib/nsqcd/worker.rb', line 6

def opts
  @opts
end

#topicObject (readonly)

Returns the value of attribute topic.



6
7
8
# File 'lib/nsqcd/worker.rb', line 6

def topic
  @topic
end

Class Method Details

.included(base) ⇒ Object



75
76
77
78
# File 'lib/nsqcd/worker.rb', line 75

def self.included(base)
  base.extend ClassMethods
  Classes << base if base.is_a? Class
end

Instance Method Details

#initialize(pool = nil, opts = {}) ⇒ Object



11
12
13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/nsqcd/worker.rb', line 11

def initialize(pool = nil, opts = {})
  worker_opts = opts.merge(self.class.opts || {})
  worker_opts = Nsqcd::CONFIG.merge(worker_opts)

  @pool = pool || Concurrent::FixedThreadPool.new(worker_opts[:threads] || Nsqcd::Configuration::DEFAULTS[:threads])
  @call_with_params = respond_to?(:work_with_params)

  @opts = worker_opts
  puts '=================='
  puts "#{self.class.name} #{@opts.inspect}"
  puts '=================='

  @id = Utils.make_worker_id(self.class.name)
end

#log_msg(msg) ⇒ Object

Construct a log message with some standard prefix for this worker



65
66
67
# File 'lib/nsqcd/worker.rb', line 65

def log_msg(msg)
  "[#{@id}][#{Thread.current}][#{self.class.name}] #{msg}"
end

#process_work(msg) ⇒ Object



49
50
51
52
53
54
55
# File 'lib/nsqcd/worker.rb', line 49

def process_work(msg)
  begin
    work(msg)
  rescue StandardError, ScriptError => ex
    worker_error(ex, log_msg: log_msg(msg), class: self.class.name, message: msg)
  end
end

#publish(msg, opts) ⇒ Object



43
44
45
46
47
# File 'lib/nsqcd/worker.rb', line 43

def publish(msg, opts)
  topic = opts.delete(:topic)
  producer = Nsq::Producer.new(opts[:nsqlookupd], topic)
  producer.write(msg)
end

#reject!Object



26
# File 'lib/nsqcd/worker.rb', line 26

def reject!; :reject; end

#requeue!Object



27
# File 'lib/nsqcd/worker.rb', line 27

def requeue!; :requeue; end

#runObject



29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/nsqcd/worker.rb', line 29

def run
  worker_trace "New worker: #{self.class} running."
  consumer = Nsq::Consumer.new(@opts)
  @pool.post do
    loop do 
      msg = consumer.pop

      worker_trace "Working off: #{msg.data.inspect} #{msg.body}"
      process_work(msg)
      msg.finish
    end
  end
end

#stopObject



57
58
59
60
61
62
# File 'lib/nsqcd/worker.rb', line 57

def stop
  worker_trace "Stopping worker: shutting down thread pool."
  @pool.shutdown
  @pool.wait_for_termination
  worker_trace "Stopping worker: I'm gone."
end

#worker_trace(msg) ⇒ Object



69
70
71
# File 'lib/nsqcd/worker.rb', line 69

def worker_trace(msg)
  logger.debug(log_msg(msg))
end