Class: Chimp::QueueWorker
- Inherits:
-
Object
- Object
- Chimp::QueueWorker
- Defined in:
- lib/right_chimp/queue/queue_worker.rb
Instance Attribute Summary collapse
-
#delay ⇒ Object
Returns the value of attribute delay.
-
#never_exit ⇒ Object
Returns the value of attribute never_exit.
-
#retry_count ⇒ Object
Returns the value of attribute retry_count.
Instance Method Summary collapse
-
#initialize ⇒ QueueWorker
constructor
A new instance of QueueWorker.
-
#run ⇒ Object
Grab work items from the ChimpQueue and process them Only stop is @ever_exit is false.
Constructor Details
#initialize ⇒ QueueWorker
Returns a new instance of QueueWorker.
9 10 11 12 13 |
# File 'lib/right_chimp/queue/queue_worker.rb', line 9 def initialize @delay = 0 @retry_count = 0 @never_exit = true end |
Instance Attribute Details
#delay ⇒ Object
Returns the value of attribute delay.
7 8 9 |
# File 'lib/right_chimp/queue/queue_worker.rb', line 7 def delay @delay end |
#never_exit ⇒ Object
Returns the value of attribute never_exit.
7 8 9 |
# File 'lib/right_chimp/queue/queue_worker.rb', line 7 def never_exit @never_exit end |
#retry_count ⇒ Object
Returns the value of attribute retry_count.
7 8 9 |
# File 'lib/right_chimp/queue/queue_worker.rb', line 7 def retry_count @retry_count end |
Instance Method Details
#run ⇒ Object
Grab work items from the ChimpQueue and process them Only stop is @ever_exit is false
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/right_chimp/queue/queue_worker.rb', line 19 def run while @never_exit work_item = ChimpQueue.instance.shift() begin if work_item != nil job_uuid = work_item.job_uuid group = work_item.group.group_id work_item.retry_count = @retry_count work_item.owner = Thread.current.object_id ChimpDaemon.instance.semaphore.synchronize do # only do this if we are running with chimpd if ChimpDaemon.instance.queue.processing[group].nil? # no op else # remove from the processing queue if ChimpDaemon.instance.queue.processing[group][job_uuid.to_sym] == 0 Log.debug 'Completed processing task ' + job_uuid.to_s Log.debug 'Deleting ' + job_uuid.to_s ChimpDaemon.instance.queue.processing[group].delete(job_uuid.to_sym) Log.debug ChimpDaemon.instance.queue.processing.inspect ChimpDaemon.instance.proc_counter -= 1 else if ChimpDaemon.instance.queue.processing[group][job_uuid.to_sym].nil? Log.debug 'Job group was already deleted, no counter to decrease.' else Log.debug 'Decreasing processing counter (' + ChimpDaemon.instance.proc_counter.to_s + ') for [' + job_uuid.to_s + '] group: ' + group.to_s ChimpDaemon.instance.queue.processing[group][job_uuid.to_sym] -= 1 Log.debug 'Processing counter now (' + ChimpDaemon.instance.proc_counter.to_s + ') for [' + job_uuid.to_s + '] group: ' + group.to_s Log.debug ChimpDaemon.instance.queue.processing[group].inspect Log.debug 'Still counting down for ' + job_uuid.to_s ChimpDaemon.instance.proc_counter -= 1 end end end end work_item.run else sleep 1 end rescue Exception => ex Log.error "Exception in QueueWorker.run: #{ex}" Log.debug ex.inspect Log.debug ex.backtrace work_item.status = Executor::STATUS_ERROR work_item.error = ex end end end |