Method: Beaneater::Jobs#process!
- Defined in:
- lib/beaneater/job/collection.rb
#process!(options = {}) ⇒ Object
Watch, reserve, process and delete or bury or release jobs.
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/beaneater/job/collection.rb', line 108 def process!(={}) release_delay = .delete(:release_delay) || RELEASE_DELAY reserve_timeout = .delete(:reserve_timeout) || RESERVE_TIMEOUT client.tubes.watch!(*processors.keys) while !stop? do begin @current_job = client.tubes.reserve(reserve_timeout) processor = processors[@current_job.tube] begin processor[:block].call(@current_job) @current_job.delete rescue *processor[:retry_on] if @current_job.stats.releases < processor[:max_retries] @current_job.release(:delay => release_delay) end end rescue AbortProcessingError break rescue Beaneater::JobNotReserved, Beaneater::NotFoundError, Beaneater::TimedOutError retry rescue StandardError # handles unspecified errors @current_job.bury if @current_job ensure # bury if still reserved @current_job.bury if @current_job && @current_job.exists? && @current_job.reserved? @current_job = nil end end end |