Module: Nsqcd::WorkerGroup
- Defined in:
- lib/nsqcd/workergroup.rb
Instance Method Summary collapse
-
#after_fork ⇒ Object
note! this is not Serverengine#after_start, this is ours!.
- #before_fork ⇒ Object
- #initialize ⇒ Object
- #run ⇒ Object
- #stop ⇒ Object
Instance Method Details
#after_fork ⇒ Object
note! this is not Serverengine#after_start, this is ours!
14 15 16 17 |
# File 'lib/nsqcd/workergroup.rb', line 14 def after_fork # note! this is not Serverengine#after_start, this is ours! fafter = Nsqcd::CONFIG[:hooks][:after_fork] fafter.call if fafter end |
#before_fork ⇒ Object
9 10 11 12 |
# File 'lib/nsqcd/workergroup.rb', line 9 def before_fork fbefore = Nsqcd::CONFIG[:hooks][:before_fork] fbefore.call if fbefore end |
#initialize ⇒ Object
5 6 7 |
# File 'lib/nsqcd/workergroup.rb', line 5 def initialize @stop_flag = ServerEngine::BlockingFlag.new end |
#run ⇒ Object
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/nsqcd/workergroup.rb', line 19 def run after_fork # Allocate single thread pool if share_threads is set. This improves load balancing # when used with many workers. pool = config[:share_threads] ? Concurrent::FixedThreadPool.new(config[:threads]) : nil worker_classes = config[:worker_classes] if worker_classes.respond_to? :call worker_classes = worker_classes.call end # If we don't provide a connection to a worker, # the queue used in the worker will create a new one @workers = worker_classes.map do |worker_class| worker_class.new( pool, { connection: config[:connection] }) end # if more than one worker this should be per worker # accumulate clients and consumers as well @workers.each do |worker| worker.run end # end per worker # until @stop_flag.wait_for_set(Nsqcd::CONFIG[:heartbeat]) Nsqcd.logger.debug("Heartbeat: running threads [#{Thread.list.count}]") # report aggregated stats? end end |