Module: Nsqcd::Worker
- Includes:
- Concerns::Logging, ErrorReporter
- Defined in:
- lib/nsqcd/worker.rb
Defined Under Namespace
Modules: ClassMethods
Constant Summary collapse
- Classes =
[]
Instance Attribute Summary collapse
-
#channel ⇒ Object
readonly
Returns the value of attribute channel.
-
#id ⇒ Object
readonly
Returns the value of attribute id.
-
#opts ⇒ Object
readonly
Returns the value of attribute opts.
-
#topic ⇒ Object
readonly
Returns the value of attribute topic.
Class Method Summary collapse
Instance Method Summary collapse
- #initialize(pool = nil, opts = {}) ⇒ Object
-
#log_msg(msg) ⇒ Object
Construct a log message with some standard prefix for this worker.
- #process_work(msg) ⇒ Object
- #publish(msg, opts) ⇒ Object
- #reject! ⇒ Object
- #requeue! ⇒ Object
- #run ⇒ Object
- #stop ⇒ Object
- #worker_trace(msg) ⇒ Object
Methods included from ErrorReporter
Instance Attribute Details
#channel ⇒ Object (readonly)
Returns the value of attribute channel.
6 7 8 |
# File 'lib/nsqcd/worker.rb', line 6 def channel @channel end |
#id ⇒ Object (readonly)
Returns the value of attribute id.
6 7 8 |
# File 'lib/nsqcd/worker.rb', line 6 def id @id end |
#opts ⇒ Object (readonly)
Returns the value of attribute opts.
6 7 8 |
# File 'lib/nsqcd/worker.rb', line 6 def opts @opts end |
#topic ⇒ Object (readonly)
Returns the value of attribute topic.
6 7 8 |
# File 'lib/nsqcd/worker.rb', line 6 def topic @topic end |
Class Method Details
.included(base) ⇒ Object
75 76 77 78 |
# File 'lib/nsqcd/worker.rb', line 75 def self.included(base) base.extend ClassMethods Classes << base if base.is_a? Class end |
Instance Method Details
#initialize(pool = nil, opts = {}) ⇒ Object
11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
# File 'lib/nsqcd/worker.rb', line 11 def initialize(pool = nil, opts = {}) worker_opts = opts.merge(self.class.opts || {}) worker_opts = Nsqcd::CONFIG.merge(worker_opts) @pool = pool || Concurrent::FixedThreadPool.new(worker_opts[:threads] || Nsqcd::Configuration::DEFAULTS[:threads]) @call_with_params = respond_to?(:work_with_params) @opts = worker_opts puts '==================' puts "#{self.class.name} #{@opts.inspect}" puts '==================' @id = Utils.make_worker_id(self.class.name) end |
#log_msg(msg) ⇒ Object
Construct a log message with some standard prefix for this worker
65 66 67 |
# File 'lib/nsqcd/worker.rb', line 65 def log_msg(msg) "[#{@id}][#{Thread.current}][#{self.class.name}] #{msg}" end |
#process_work(msg) ⇒ Object
49 50 51 52 53 54 55 |
# File 'lib/nsqcd/worker.rb', line 49 def process_work(msg) begin work(msg) rescue StandardError, ScriptError => ex worker_error(ex, log_msg: log_msg(msg), class: self.class.name, message: msg) end end |
#publish(msg, opts) ⇒ Object
43 44 45 46 47 |
# File 'lib/nsqcd/worker.rb', line 43 def publish(msg, opts) topic = opts.delete(:topic) producer = Nsq::Producer.new(opts[:nsqlookupd], topic) producer.write(msg) end |
#reject! ⇒ Object
26 |
# File 'lib/nsqcd/worker.rb', line 26 def reject!; :reject; end |
#requeue! ⇒ Object
27 |
# File 'lib/nsqcd/worker.rb', line 27 def requeue!; :requeue; end |
#run ⇒ Object
29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/nsqcd/worker.rb', line 29 def run worker_trace "New worker: #{self.class} running." consumer = Nsq::Consumer.new(@opts) @pool.post do loop do msg = consumer.pop worker_trace "Working off: #{msg.data.inspect} #{msg.body}" process_work(msg) msg.finish end end end |
#stop ⇒ Object
57 58 59 60 61 62 |
# File 'lib/nsqcd/worker.rb', line 57 def stop worker_trace "Stopping worker: shutting down thread pool." @pool.shutdown @pool.wait_for_termination worker_trace "Stopping worker: I'm gone." end |
#worker_trace(msg) ⇒ Object
69 70 71 |
# File 'lib/nsqcd/worker.rb', line 69 def worker_trace(msg) logger.debug(log_msg(msg)) end |