Class: RabbitJobs::Worker
- Inherits:
-
Object
- Object
- RabbitJobs::Worker
- Includes:
- AmqpHelpers, Logger
- Defined in:
- lib/rabbit_jobs/worker.rb
Instance Attribute Summary collapse
-
#background ⇒ Object
Returns the value of attribute background.
-
#pidfile ⇒ Object
Returns the value of attribute pidfile.
Instance Method Summary collapse
-
#initialize(*queues) ⇒ Worker
constructor
Workers should be initialized with an array of string queue names.
- #kill_child ⇒ Object
- #queues ⇒ Object
- #shutdown ⇒ Object
- #shutdown! ⇒ Object
- #startup ⇒ Object
-
#work(time = 0) ⇒ Object
Subscribes to channel and working on jobs.
Methods included from Logger
Methods included from AmqpHelpers
#amqp_with_exchange, #amqp_with_queue, #make_queue
Constructor Details
#initialize(*queues) ⇒ Worker
Workers should be initialized with an array of string queue names. The order is important: a Worker will check the first queue given for a job. If none is found, it will check the second queue name given. If a job is found, it will be processed. Upon completion, the Worker will again check the first queue given, and so forth. In this way the queue list passed to a Worker on startup defines the priorities of queues.
If passed a single “*”, this Worker will operate on all queues in alphabetical order. Queues can be dynamically added or removed without needing to restart workers using this method.
21 22 23 24 25 26 |
# File 'lib/rabbit_jobs/worker.rb', line 21 def initialize(*queues) @queues = queues.map { |queue| queue.to_s.strip }.flatten.uniq if @queues == ['*'] || @queues.empty? @queues = RabbitJobs.config.routing_keys end end |
Instance Attribute Details
#background ⇒ Object
Returns the value of attribute background.
8 9 10 |
# File 'lib/rabbit_jobs/worker.rb', line 8 def background @background end |
#pidfile ⇒ Object
Returns the value of attribute pidfile.
8 9 10 |
# File 'lib/rabbit_jobs/worker.rb', line 8 def pidfile @pidfile end |
Instance Method Details
#kill_child ⇒ Object
107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/rabbit_jobs/worker.rb', line 107 def kill_child if @job && @job.child_pid # log! "Killing child at #{@child}" if Kernel.system("ps -o pid,state -p #{@job.child_pid}") Process.kill("KILL", @job.child_pid) rescue nil else # log! "Child #{@child} not found, restarting." # shutdown end end end |
#queues ⇒ Object
28 29 30 |
# File 'lib/rabbit_jobs/worker.rb', line 28 def queues @queues || ['default'] end |
#shutdown ⇒ Object
79 80 81 |
# File 'lib/rabbit_jobs/worker.rb', line 79 def shutdown @shutdown = true end |
#shutdown! ⇒ Object
102 103 104 105 |
# File 'lib/rabbit_jobs/worker.rb', line 102 def shutdown! shutdown kill_child end |
#startup ⇒ Object
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/rabbit_jobs/worker.rb', line 83 def startup # prune_dead_workers Process.daemon(true) if self.background if self.pidfile File.open(self.pidfile, 'w') { |f| f << Process.pid } end # Fix buffering so we can `rake rj:work > resque.log` and # get output from the child in there. $stdout.sync = true @shutdown = false Signal.trap('TERM') { shutdown } Signal.trap('INT') { shutdown! } end |
#work(time = 0) ⇒ Object
Subscribes to channel and working on jobs
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/rabbit_jobs/worker.rb', line 33 def work(time = 0) startup processed_count = 0 amqp_with_exchange do |connection, exchange| exchange.channel.prefetch(1) check_shutdown = Proc.new { if @shutdown log "Processed jobs: #{processed_count}" log "Stopping worker..." connection.close { File.delete(self.pidfile) if self.pidfile EM.stop { exit! } } end } queues.each do |routing_key| queue = make_queue(exchange, routing_key) log "Worker ##{Process.pid} <= #{exchange.name}##{routing_key}" queue.subscribe(ack: true) do |, payload| @job = RabbitJobs::Job.parse(payload) @job.run_perform unless @job.expired? .ack processed_count += 1 check_shutdown.call end end if time > 0 # for debugging EM.add_timer(time) do self.shutdown end end EM.add_periodic_timer(1) do check_shutdown.call end end end |