Class: Beanstalker::Worker
- Inherits:
-
Object
- Object
- Beanstalker::Worker
- Defined in:
- lib/beanstalker/worker.rb
Constant Summary collapse
- SLEEP_TIME =
rails loads this file twice
60
Class Attribute Summary collapse
-
.before_filter ⇒ Object
Returns the value of attribute before_filter.
-
.custom_error_handler ⇒ Object
Returns the value of attribute custom_error_handler.
-
.custom_timeout_handler ⇒ Object
Returns the value of attribute custom_timeout_handler.
-
.finish ⇒ Object
Returns the value of attribute finish.
-
.on_job_event ⇒ Object
Returns the value of attribute on_job_event.
Class Method Summary collapse
- .before_reserve(&block) ⇒ Object
- .before_reserves ⇒ Object
- .default_handle_error(job, ex) ⇒ Object
- .default_handle_timeout(job) ⇒ Object
- .error_handler(&block) ⇒ Object
- .run_before_reserve ⇒ Object
- .timeout_handler(&block) ⇒ Object
Instance Method Summary collapse
- #brief?(t1, t2) ⇒ Boolean
- #class_error_handler(klass) ⇒ Object
- #dispatch(job) ⇒ Object
- #do_all_work ⇒ Object
- #get_job ⇒ Object
- #get_job_body(job) ⇒ Object
- #handle_error(job, ex) ⇒ Object
- #handle_timeout(job) ⇒ Object
-
#initialize(top_binding, options = {}) ⇒ Worker
constructor
A new instance of Worker.
- #logger ⇒ Object
- #main_loop ⇒ Object
- #mapped_job?(job) ⇒ Boolean
- #q_hint ⇒ Object
- #rails_job?(job) ⇒ Boolean
-
#reserve_and_set_hint ⇒ Object
This heuristic is to help prevent one queue from starving.
- #run ⇒ Object
- #run_ao_job(job) ⇒ Object
- #run_code(job_id, code) ⇒ Object
- #run_mapped_job(job) ⇒ Object
- #run_with_ruby_timeout_if_set(job_desc, job, &block) ⇒ Object
- #safe_dispatch(job) ⇒ Object
- #shutdown ⇒ Object
- #startup ⇒ Object
Constructor Details
#initialize(top_binding, options = {}) ⇒ Worker
Returns a new instance of Worker.
55 56 57 58 59 60 61 62 63 64 |
# File 'lib/beanstalker/worker.rb', line 55 def initialize(top_binding, = {}) mapper_file = "#{RAILS_ROOT}/config/beanstalker_mapper.rb" @mapper = Beanstalker::Mapper.new(mapper_file) if File.exist?(mapper_file) @top_binding = top_binding @stop = false @options = if @options && @options[:servers] Beanstalker::Queue.queue = Beanstalk::Pool.new(@options[:servers]) end end |
Class Attribute Details
.before_filter ⇒ Object
Returns the value of attribute before_filter.
31 32 33 |
# File 'lib/beanstalker/worker.rb', line 31 def before_filter @before_filter end |
.custom_error_handler ⇒ Object
Returns the value of attribute custom_error_handler.
29 30 31 |
# File 'lib/beanstalker/worker.rb', line 29 def custom_error_handler @custom_error_handler end |
.custom_timeout_handler ⇒ Object
Returns the value of attribute custom_timeout_handler.
30 31 32 |
# File 'lib/beanstalker/worker.rb', line 30 def custom_timeout_handler @custom_timeout_handler end |
.finish ⇒ Object
Returns the value of attribute finish.
28 29 30 |
# File 'lib/beanstalker/worker.rb', line 28 def finish @finish end |
.on_job_event ⇒ Object
Returns the value of attribute on_job_event.
32 33 34 |
# File 'lib/beanstalker/worker.rb', line 32 def on_job_event @on_job_event end |
Class Method Details
.before_reserve(&block) ⇒ Object
46 47 48 |
# File 'lib/beanstalker/worker.rb', line 46 def before_reserve(&block) before_reserves << block end |
.before_reserves ⇒ Object
42 43 44 |
# File 'lib/beanstalker/worker.rb', line 42 def before_reserves @before_reserves ||= [] end |
.default_handle_error(job, ex) ⇒ Object
246 247 248 249 250 251 252 |
# File 'lib/beanstalker/worker.rb', line 246 def self.default_handle_error(job, ex) Daemonizer.logger.info "Job failed: #{job.server}/#{job.id}" Daemonizer.logger.info("#{ex.class}: #{ex}\n" + ex.backtrace.join("\n")) job.decay rescue Beanstalk::UnexpectedResponse => e Daemonizer.logger.info "Unexpected Beanstalkd error: #{job.server}/#{job.id}. #{e.inspect}" end |
.default_handle_timeout(job) ⇒ Object
254 255 256 257 258 259 |
# File 'lib/beanstalker/worker.rb', line 254 def self.default_handle_timeout(job) Daemonizer.logger.info "Job timeout: #{job.server}/#{job.id}" job.decay rescue Beanstalk::UnexpectedResponse => e Daemonizer.logger.info "Unexpected Beanstalkd error: #{job.server}/#{job.id}. #{e.inspect}" end |
.error_handler(&block) ⇒ Object
34 35 36 |
# File 'lib/beanstalker/worker.rb', line 34 def error_handler(&block) self.custom_error_handler = block end |
.run_before_reserve ⇒ Object
50 51 52 |
# File 'lib/beanstalker/worker.rb', line 50 def run_before_reserve before_reserves.each {|b| b.call} end |
.timeout_handler(&block) ⇒ Object
38 39 40 |
# File 'lib/beanstalker/worker.rb', line 38 def timeout_handler(&block) self.custom_timeout_handler = block end |
Instance Method Details
#brief?(t1, t2) ⇒ Boolean
124 125 126 |
# File 'lib/beanstalker/worker.rb', line 124 def brief?(t1, t2) ((t2 - t1) * 100).to_i.abs < 10 end |
#class_error_handler(klass) ⇒ Object
184 185 186 187 188 189 190 191 192 193 |
# File 'lib/beanstalker/worker.rb', line 184 def class_error_handler(klass) if klass.respond_to?(:async_error_handler) and async_error_handler = klass.async_error_handler and async_error_handler.is_a?(Proc) then async_error_handler else false end end |
#dispatch(job) ⇒ Object
153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
# File 'lib/beanstalker/worker.rb', line 153 def dispatch(job) ActiveRecord::Base.verify_active_connections! logger.info "Got job: #{get_job_body(job).inspect}" self.class.on_job_event.call(job, :dispatch) if self.class.on_job_event if rails_job?(job) run_ao_job(job) elsif mapped_job?(job) run_mapped_job(job) else logger.error "Job #{job.inspect} cannot be processed... deleteing" job.delete end end |
#do_all_work ⇒ Object
329 330 331 332 333 |
# File 'lib/beanstalker/worker.rb', line 329 def do_all_work logger.info 'finishing all running jobs' f = self.class.finish f.call if f end |
#get_job ⇒ Object
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
# File 'lib/beanstalker/worker.rb', line 128 def get_job loop do begin Beanstalker::Queue.queue.connect self.class.run_before_reserve return reserve_and_set_hint rescue Interrupt => ex raise ex rescue SignalException => ex raise ex rescue Beanstalk::DeadlineSoonError # Do nothing; immediately try again, giving the user a chance to # clean up in the before_reserve hook. Daemonizer.logger.info 'Job deadline soon; you should clean up.' rescue Exception => ex @q_hint = nil # in case there's something wrong with this conn Daemonizer.logger.info( "#{ex.class}: #{ex}\n" + ex.backtrace.join("\n")) Daemonizer.logger.info 'something is wrong. We failed to get a job.' Daemonizer.logger.info "sleeping for #{SLEEP_TIME}s..." sleep(SLEEP_TIME) end end end |
#get_job_body(job) ⇒ Object
295 296 297 |
# File 'lib/beanstalker/worker.rb', line 295 def get_job_body(job) job.ybody.with_indifferent_access end |
#handle_error(job, ex) ⇒ Object
195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 |
# File 'lib/beanstalker/worker.rb', line 195 def handle_error(job, ex) custom_error_handler_ok = false Daemonizer.logger.warn "Handling exception: #{ex.backtrace.join("\n")}, job = #{job.id}" self.class.on_job_event.call(job, :error) if self.class.on_job_event if rails_job?(job) class_name = get_job_body(job)[:data][:class] if class_name begin klass = class_name.constantize rescue Exception => e klass = nil end error_handler = class_error_handler(klass) if error_handler.is_a?(Proc) Daemonizer.logger.info "Running custom error handler for class #{class_name}, job = #{job.id}" error_handler.call(job, ex) job_reserved = begin job.stats['state'] == 'reserved' rescue Beanstalk::NotFoundError false end if job_reserved Daemonizer.logger.info "Custom error handler for class #{class_name} didn't release job. job = #{job.id}" else Daemonizer.logger.info "Custom error handler for class #{class_name} released job. job = #{job.id}" custom_error_handler_ok = true end end end end unless custom_error_handler_ok Daemonizer.logger.info "Running common handler. job = #{job.id}" if self.class.custom_error_handler self.class.custom_error_handler.call(job, ex) else self.class.default_handle_error(job, ex) end else end end |
#handle_timeout(job) ⇒ Object
238 239 240 241 242 243 244 |
# File 'lib/beanstalker/worker.rb', line 238 def handle_timeout(job) if self.class.custom_timeout_handler self.class.custom_timeout_handler.call(job) else self.class.default_handle_timeout(job) end end |
#logger ⇒ Object
108 109 110 |
# File 'lib/beanstalker/worker.rb', line 108 def logger defined?(Daemonizer) && Daemonizer.logger or RAILS_DEFAULT_LOGGER end |
#main_loop ⇒ Object
66 67 68 69 70 71 72 |
# File 'lib/beanstalker/worker.rb', line 66 def main_loop trap('TERM') { @stop = true } loop do break if @stop safe_dispatch(get_job) end end |
#mapped_job?(job) ⇒ Boolean
325 326 327 |
# File 'lib/beanstalker/worker.rb', line 325 def mapped_job?(job) @mapper && @mapper.can_handle_kind?(get_job_body(job)['kind']) end |
#q_hint ⇒ Object
104 105 106 |
# File 'lib/beanstalker/worker.rb', line 104 def q_hint @q_hint || Beanstalker::Queue.queue end |
#rails_job?(job) ⇒ Boolean
321 322 323 |
# File 'lib/beanstalker/worker.rb', line 321 def rails_job?(job) get_job_body(job)['kind'].to_s == 'rails_beanstalker' end |
#reserve_and_set_hint ⇒ Object
This heuristic is to help prevent one queue from starving. The idea is that if the connection returns a job right away, it probably has more available. But if it takes time, then it’s probably empty. So reuse the same connection as long as it stays fast. Otherwise, have no preference.
116 117 118 119 120 121 122 |
# File 'lib/beanstalker/worker.rb', line 116 def reserve_and_set_hint t1 = Time.now.utc return job = q_hint.reserve ensure t2 = Time.now.utc @q_hint = if brief?(t1, t2) and job then job.conn else nil end end |
#run ⇒ Object
97 98 99 100 101 102 |
# File 'lib/beanstalker/worker.rb', line 97 def run startup main_loop rescue Interrupt shutdown end |
#run_ao_job(job) ⇒ Object
299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 |
# File 'lib/beanstalker/worker.rb', line 299 def run_ao_job(job) job_data = get_job_body(job)['data'] code = job_data['code'] run_with_ruby_timeout_if_set(code, job) do t1 = Time.now f = self.class.before_filter statistics = job.stats.dup can_run = f ? f.call(job) : true if can_run run_code(job.id, code) job.delete logger.info "Finished. Job id=#{statistics['id']}. Code '#{code}'. Time taken: #{(Time.now - t1).to_f} sec" else logger.info "Not runnind due to :before_filter restriction. Job id=#{statistics['id']}. Code '#{code}'." end end end |
#run_code(job_id, code) ⇒ Object
317 318 319 |
# File 'lib/beanstalker/worker.rb', line 317 def run_code(job_id, code) eval(code, @top_binding, "(beanstalk job #{job_id})", 1) end |
#run_mapped_job(job) ⇒ Object
274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 |
# File 'lib/beanstalker/worker.rb', line 274 def run_mapped_job(job) job_body = get_job_body(job) job_kind = job_body['kind'] job_data = job_body['data'] job_method = job_data['method'] job_desc = "#{job_kind}/#{job_method}" run_with_ruby_timeout_if_set(job_desc, job) do t1 = Time.now @map_job = @mapper && @mapper.method_for(job_kind, job_method) if @map_job @map_job.call(job_data['body'] || {}) logger.info "Finished. Job id=#{job.stats['id']}. Mapped from '#{job_desc}'. Time taken: #{(Time.now - t1).to_f} sec" else logger.error "Job id=#{job.stats['id']}. Mapping not found: '#{job_desc}'. Releases #{job.stats['releases']}. Deleting" end job.delete end end |
#run_with_ruby_timeout_if_set(job_desc, job, &block) ⇒ Object
261 262 263 264 265 266 267 268 269 270 271 272 |
# File 'lib/beanstalker/worker.rb', line 261 def run_with_ruby_timeout_if_set(job_desc, job, &block) if @options[:ruby_timeout] timeout = (job.stats['ttr'].to_f * 0.8) logger.info "TO=#{timeout} sec. Job id=#{job.stats['id']}. Running '#{job_desc}'. Age #{job.stats['age']}, Releases #{job.stats['releases']}, TTR #{job.stats['ttr']}" Timeout.timeout(timeout) do block.call end else logger.info "Job id=#{job.stats['id']}. Running '#{job_desc}'. Age #{job.stats['age']}, Releases #{job.stats['releases']}, TTR #{job.stats['ttr']}" block.call end end |
#safe_dispatch(job) ⇒ Object
169 170 171 172 173 174 175 176 177 178 179 180 181 182 |
# File 'lib/beanstalker/worker.rb', line 169 def safe_dispatch(job) begin return dispatch(job) rescue Timeout::Error handle_timeout(job) rescue Interrupt => ex begin job.release rescue :ok end raise ex rescue Exception => ex handle_error(job, ex) ensure Daemonizer.flush_logger end end |
#shutdown ⇒ Object
93 94 95 |
# File 'lib/beanstalker/worker.rb', line 93 def shutdown do_all_work end |
#startup ⇒ Object
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/beanstalker/worker.rb', line 74 def startup tubes = Array.wrap(@options[:tube] || "default").map(&:to_s) #["default"] watched_tubes = Beanstalker::Queue.queue.list_tubes_watched.values.flatten #["default"] to_watch = tubes - watched_tubes to_ignore = watched_tubes - tubes to_watch.each do |t| Beanstalker::Queue.queue.watch(t) end to_ignore.each do |t| begin Beanstalker::Queue.queue.ignore(t) rescue Exception => e Daemonizer.logger.info "Failed to ignore tube: #{t}" end end Daemonizer.logger.info "Using tubes: #{Beanstalker::Queue.queue.list_tubes_watched.values.flatten.join(',')}" Daemonizer.flush_logger end |