Class: RockQueue::Worker
- Inherits:
-
Object
- Object
- RockQueue::Worker
- Defined in:
- lib/rock-queue/worker.rb
Instance Attribute Summary collapse
-
#verbose ⇒ Object
Whether the worker should log basic info to STDOUT.
Instance Method Summary collapse
-
#initialize(*queues) ⇒ Worker
constructor
Initialize connection to queue server.
-
#queues ⇒ Object
Returns a list of queues A single ‘*’ means all queues.
-
#work(interval = 5) ⇒ Object
Main worker loop where all jobs are beeing pulled of the queue.
Constructor Details
Instance Attribute Details
#verbose ⇒ Object
Whether the worker should log basic info to STDOUT
4 5 6 |
# File 'lib/rock-queue/worker.rb', line 4 def verbose @verbose end |
Instance Method Details
#queues ⇒ Object
Returns a list of queues A single ‘*’ means all queues
51 52 53 |
# File 'lib/rock-queue/worker.rb', line 51 def queues @queues[0] == "*" ? RockQueue.queues : @queues end |
#work(interval = 5) ⇒ Object
Main worker loop where all jobs are beeing pulled of the queue. This is also a place where every job starts and ends it’s lifecycle.
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/rock-queue/worker.rb', line 15 def work(interval = 5) RockQueue.logger.info "=> Worker ready. Hold your horses!" stop = false loop do sleep(interval) ActiveRecord::Base.verify_active_connections! queues.each do |qname| obj, args = RockQueue.pop(qname) if obj queue = QueueObject.new(obj, args) begin # code that actually performs the action args = queue.args.first RockQueue.logger.info "=> Processing class #{queue.object.name} with params: #{args.inspect}" args.empty? ? queue.object.perform : queue.object.perform(args) rescue Object => e # Add failed processing and retry if queue.add_fail(e) sleep(queue.get_sleep_time) RockQueue.logger.error "=> Processing fail! Retrying #{queue.fails.length}" RockQueue.logger.error " Message: #{e.}" retry end end stop = false else stop = true if interval == 0 end end break if stop end end |