Class: AsyncObserver::Worker
- Inherits:
-
Object
- Object
- AsyncObserver::Worker
- Defined in:
- lib/async_observer/worker.rb
Constant Summary collapse
- SLEEP_TIME =
rails loads this file twice
60
Class Attribute Summary collapse
-
.before_filter ⇒ Object
Returns the value of attribute before_filter.
-
.custom_error_handler ⇒ Object
Returns the value of attribute custom_error_handler.
-
.finish ⇒ Object
Returns the value of attribute finish.
- .handle ⇒ Object
Class Method Summary collapse
- .before_reserve(&block) ⇒ Object
- .before_reserves ⇒ Object
- .default_handle_error(job, ex) ⇒ Object
- .error_handler(&block) ⇒ Object
- .run_before_reserve ⇒ Object
Instance Method Summary collapse
- #async_observer_job?(job) ⇒ Boolean
- #brief?(t1, t2) ⇒ Boolean
- #dispatch(job) ⇒ Object
- #do_all_work ⇒ Object
- #flush_logger ⇒ Object
- #get_job ⇒ Object
- #handle_error(job, ex) ⇒ Object
-
#initialize(top_binding, options = {}) ⇒ Worker
constructor
A new instance of Worker.
- #logger ⇒ Object
- #main_loop ⇒ Object
- #q_hint ⇒ Object
-
#reserve_and_set_hint ⇒ Object
This heuristic is to help prevent one queue from starving.
- #run ⇒ Object
- #run_ao_job(job) ⇒ Object
- #run_code(job) ⇒ Object
- #run_other(job) ⇒ Object
- #safe_dispatch(job) ⇒ Object
- #shutdown ⇒ Object
- #startup ⇒ Object
Constructor Details
#initialize(top_binding, options = {}) ⇒ Worker
Returns a new instance of Worker.
57 58 59 60 61 62 63 64 |
# File 'lib/async_observer/worker.rb', line 57 def initialize(top_binding, = {}) @top_binding = top_binding @stop = false @options = if @options && @options[:servers] AsyncObserver::Queue.queue = Beanstalk::Pool.new(@options[:servers]) end end |
Class Attribute Details
.before_filter ⇒ Object
Returns the value of attribute before_filter.
29 30 31 |
# File 'lib/async_observer/worker.rb', line 29 def before_filter @before_filter end |
.custom_error_handler ⇒ Object
Returns the value of attribute custom_error_handler.
28 29 30 |
# File 'lib/async_observer/worker.rb', line 28 def custom_error_handler @custom_error_handler end |
.finish ⇒ Object
Returns the value of attribute finish.
27 28 29 |
# File 'lib/async_observer/worker.rb', line 27 def finish @finish end |
.handle ⇒ Object
32 33 34 |
# File 'lib/async_observer/worker.rb', line 32 def handle @handle or raise 'no custom handler is defined' end |
Class Method Details
.before_reserve(&block) ⇒ Object
44 45 46 |
# File 'lib/async_observer/worker.rb', line 44 def before_reserve(&block) before_reserves << block end |
.before_reserves ⇒ Object
40 41 42 |
# File 'lib/async_observer/worker.rb', line 40 def before_reserves @before_reserves ||= [] end |
.default_handle_error(job, ex) ⇒ Object
175 176 177 178 179 180 181 182 183 184 185 |
# File 'lib/async_observer/worker.rb', line 175 def self.default_handle_error(job, ex) logger.info "Job failed: #{job.server}/#{job.id}" logger.info("#{ex.class}: #{ex}\n" + ex.backtrace.join("\n")) if job.stats['releases'] > 10 job.bury logger.info "BURY job due to many releases" else job.decay end rescue Beanstalk::UnexpectedResponse end |
.error_handler(&block) ⇒ Object
36 37 38 |
# File 'lib/async_observer/worker.rb', line 36 def error_handler(&block) self.custom_error_handler = block end |
.run_before_reserve ⇒ Object
48 49 50 |
# File 'lib/async_observer/worker.rb', line 48 def run_before_reserve before_reserves.each {|b| b.call} end |
Instance Method Details
#async_observer_job?(job) ⇒ Boolean
208 209 210 |
# File 'lib/async_observer/worker.rb', line 208 def async_observer_job?(job) begin job.ybody[:type] == :rails rescue false end end |
#brief?(t1, t2) ⇒ Boolean
108 109 110 |
# File 'lib/async_observer/worker.rb', line 108 def brief?(t1, t2) ((t2 - t1) * 100).to_i.abs < 10 end |
#dispatch(job) ⇒ Object
137 138 139 140 141 |
# File 'lib/async_observer/worker.rb', line 137 def dispatch(job) ActiveRecord::Base.verify_active_connections! return run_ao_job(job) if async_observer_job?(job) return run_other(job) end |
#do_all_work ⇒ Object
217 218 219 220 221 |
# File 'lib/async_observer/worker.rb', line 217 def do_all_work logger.info 'finishing all running jobs. interrupt again to kill them.' f = self.class.finish f.call if f end |
#flush_logger ⇒ Object
160 161 162 163 164 165 |
# File 'lib/async_observer/worker.rb', line 160 def flush_logger if defined?(logger) && logger.respond_to?(:flush) logger.flush end end |
#get_job ⇒ Object
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/async_observer/worker.rb', line 112 def get_job loop do begin AsyncObserver::Queue.queue.connect self.class.run_before_reserve return reserve_and_set_hint rescue Interrupt => ex raise ex rescue SignalException => ex raise ex rescue Beanstalk::DeadlineSoonError # Do nothing; immediately try again, giving the user a chance to # clean up in the before_reserve hook. logger.info 'Job deadline soon; you should clean up.' rescue Exception => ex @q_hint = nil # in case there's something wrong with this conn logger.info( "#{ex.class}: #{ex}\n" + ex.backtrace.join("\n")) logger.info 'something is wrong. We failed to get a job.' logger.info "sleeping for #{SLEEP_TIME}s..." sleep(SLEEP_TIME) end end end |
#handle_error(job, ex) ⇒ Object
167 168 169 170 171 172 173 |
# File 'lib/async_observer/worker.rb', line 167 def handle_error(job, ex) if self.class.custom_error_handler self.class.custom_error_handler.call(job, ex) else self.class.default_handle_error(job, ex) end end |
#logger ⇒ Object
53 54 55 |
# File 'lib/async_observer/worker.rb', line 53 def logger $logger or RAILS_DEFAULT_LOGGER end |
#main_loop ⇒ Object
66 67 68 69 70 71 72 |
# File 'lib/async_observer/worker.rb', line 66 def main_loop trap('TERM') { @stop = true } loop do break if @stop safe_dispatch(get_job) end end |
#q_hint ⇒ Object
92 93 94 |
# File 'lib/async_observer/worker.rb', line 92 def q_hint @q_hint || AsyncObserver::Queue.queue end |
#reserve_and_set_hint ⇒ Object
This heuristic is to help prevent one queue from starving. The idea is that if the connection returns a job right away, it probably has more available. But if it takes time, then it’s probably empty. So reuse the same connection as long as it stays fast. Otherwise, have no preference.
100 101 102 103 104 105 106 |
# File 'lib/async_observer/worker.rb', line 100 def reserve_and_set_hint t1 = Time.now.utc return job = q_hint.reserve ensure t2 = Time.now.utc @q_hint = if brief?(t1, t2) and job then job.conn else nil end end |
#run ⇒ Object
85 86 87 88 89 90 |
# File 'lib/async_observer/worker.rb', line 85 def run startup main_loop rescue Interrupt shutdown end |
#run_ao_job(job) ⇒ Object
187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 |
# File 'lib/async_observer/worker.rb', line 187 def run_ao_job(job) logger.info 'running as async observer job' f = self.class.before_filter f.call(job) if f job.delete if job.ybody[:delete_first] run_code(job) job.delete unless job.ybody[:delete_first] rescue ActiveRecord::RecordNotFound => ex unless job.ybody[:delete_first] if job.age > 60 job.delete # it's old; this error is most likely permanent else job.decay # it could be replication delay so retry quietly end end end |
#run_code(job) ⇒ Object
204 205 206 |
# File 'lib/async_observer/worker.rb', line 204 def run_code(job) eval(job.ybody[:code], @top_binding, "(beanstalk job #{job.id})", 1) end |
#run_other(job) ⇒ Object
212 213 214 215 |
# File 'lib/async_observer/worker.rb', line 212 def run_other(job) logger.info 'trying custom handler' self.class.handle.call(job) end |
#safe_dispatch(job) ⇒ Object
143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
# File 'lib/async_observer/worker.rb', line 143 def safe_dispatch(job) logger.info "got #{job.inspect}:\n" + job.body job.stats.each do |k,v| logger.info "#{k}=#{v}" end begin return dispatch(job) rescue Interrupt => ex begin job.release rescue :ok end raise ex rescue Exception => ex handle_error(job, ex) ensure flush_logger end end |
#shutdown ⇒ Object
81 82 83 |
# File 'lib/async_observer/worker.rb', line 81 def shutdown do_all_work end |
#startup ⇒ Object
74 75 76 77 78 79 |
# File 'lib/async_observer/worker.rb', line 74 def startup tube = @options[:tube] || "default" logger.info "Using tube #{tube}" AsyncObserver::Queue.queue.watch(tube) flush_logger end |