Class: AsyncObserver::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/async_observer/worker.rb

Constant Summary collapse

SLEEP_TIME =

rails loads this file twice

60

Class Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(top_binding, options = {}) ⇒ Worker

Returns a new instance of Worker.



57
58
59
60
61
62
63
64
# File 'lib/async_observer/worker.rb', line 57

def initialize(top_binding, options = {})
  @top_binding = top_binding
  @stop = false
  @options = options
  if @options && @options[:servers]
    AsyncObserver::Queue.queue = Beanstalk::Pool.new(@options[:servers])
  end
end

Class Attribute Details

.before_filterObject

Returns the value of attribute before_filter.



29
30
31
# File 'lib/async_observer/worker.rb', line 29

def before_filter
  @before_filter
end

.custom_error_handlerObject

Returns the value of attribute custom_error_handler.



28
29
30
# File 'lib/async_observer/worker.rb', line 28

def custom_error_handler
  @custom_error_handler
end

.finishObject

Returns the value of attribute finish.



27
28
29
# File 'lib/async_observer/worker.rb', line 27

def finish
  @finish
end

.handleObject



32
33
34
# File 'lib/async_observer/worker.rb', line 32

def handle
  @handle or raise 'no custom handler is defined'
end

Class Method Details

.before_reserve(&block) ⇒ Object



44
45
46
# File 'lib/async_observer/worker.rb', line 44

def before_reserve(&block)
  before_reserves << block
end

.before_reservesObject



40
41
42
# File 'lib/async_observer/worker.rb', line 40

def before_reserves
  @before_reserves ||= []
end

.default_handle_error(job, ex) ⇒ Object



175
176
177
178
179
180
181
182
183
184
185
# File 'lib/async_observer/worker.rb', line 175

def self.default_handle_error(job, ex)
  logger.info "Job failed: #{job.server}/#{job.id}"
  logger.info("#{ex.class}: #{ex}\n" + ex.backtrace.join("\n"))
  if job.stats['releases'] > 10
    job.bury
    logger.info "BURY job due to many releases"
  else
    job.decay
  end
rescue Beanstalk::UnexpectedResponse
end

.error_handler(&block) ⇒ Object



36
37
38
# File 'lib/async_observer/worker.rb', line 36

def error_handler(&block)
  self.custom_error_handler = block
end

.run_before_reserveObject



48
49
50
# File 'lib/async_observer/worker.rb', line 48

def run_before_reserve
  before_reserves.each {|b| b.call}
end

Instance Method Details

#async_observer_job?(job) ⇒ Boolean

Returns:

  • (Boolean)


208
209
210
# File 'lib/async_observer/worker.rb', line 208

def async_observer_job?(job)
  begin job.ybody[:type] == :rails rescue false end
end

#brief?(t1, t2) ⇒ Boolean

Returns:

  • (Boolean)


108
109
110
# File 'lib/async_observer/worker.rb', line 108

def brief?(t1, t2)
  ((t2 - t1) * 100).to_i.abs < 10
end

#dispatch(job) ⇒ Object



137
138
139
140
141
# File 'lib/async_observer/worker.rb', line 137

def dispatch(job)
  ActiveRecord::Base.verify_active_connections!
  return run_ao_job(job) if async_observer_job?(job)
  return run_other(job)
end

#do_all_workObject



217
218
219
220
221
# File 'lib/async_observer/worker.rb', line 217

def do_all_work
  logger.info 'finishing all running jobs. interrupt again to kill them.'
  f = self.class.finish
  f.call if f
end

#flush_loggerObject



160
161
162
163
164
165
# File 'lib/async_observer/worker.rb', line 160

def flush_logger
  if defined?(logger) &&
      logger.respond_to?(:flush)
    logger.flush
  end
end

#get_jobObject



112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/async_observer/worker.rb', line 112

def get_job
  loop do
    begin
      AsyncObserver::Queue.queue.connect
      self.class.run_before_reserve
      return reserve_and_set_hint
    rescue Interrupt => ex
      raise ex
    rescue SignalException => ex
      raise ex
    rescue Beanstalk::DeadlineSoonError
      # Do nothing; immediately try again, giving the user a chance to
      # clean up in the before_reserve hook.
      logger.info 'Job deadline soon; you should clean up.'
    rescue Exception => ex
      @q_hint = nil # in case there's something wrong with this conn
      logger.info(
        "#{ex.class}: #{ex}\n" + ex.backtrace.join("\n"))
      logger.info 'something is wrong. We failed to get a job.'
      logger.info "sleeping for #{SLEEP_TIME}s..."
      sleep(SLEEP_TIME)
    end
  end
end

#handle_error(job, ex) ⇒ Object



167
168
169
170
171
172
173
# File 'lib/async_observer/worker.rb', line 167

def handle_error(job, ex)
  if self.class.custom_error_handler
    self.class.custom_error_handler.call(job, ex)
  else
    self.class.default_handle_error(job, ex)
  end
end

#loggerObject



53
54
55
# File 'lib/async_observer/worker.rb', line 53

def logger
  $logger or RAILS_DEFAULT_LOGGER
end

#main_loopObject



66
67
68
69
70
71
72
# File 'lib/async_observer/worker.rb', line 66

def main_loop
  trap('TERM') { @stop = true }
  loop do
    break if @stop
    safe_dispatch(get_job)
  end
end

#q_hintObject



92
93
94
# File 'lib/async_observer/worker.rb', line 92

def q_hint
  @q_hint || AsyncObserver::Queue.queue
end

#reserve_and_set_hintObject

This heuristic is to help prevent one queue from starving. The idea is that if the connection returns a job right away, it probably has more available. But if it takes time, then it’s probably empty. So reuse the same connection as long as it stays fast. Otherwise, have no preference.



100
101
102
103
104
105
106
# File 'lib/async_observer/worker.rb', line 100

def reserve_and_set_hint
  t1 = Time.now.utc
  return job = q_hint.reserve
ensure
  t2 = Time.now.utc
  @q_hint = if brief?(t1, t2) and job then job.conn else nil end
end

#runObject



85
86
87
88
89
90
# File 'lib/async_observer/worker.rb', line 85

def run
  startup
  main_loop
rescue Interrupt
  shutdown
end

#run_ao_job(job) ⇒ Object



187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
# File 'lib/async_observer/worker.rb', line 187

def run_ao_job(job)
  logger.info 'running as async observer job'
  f = self.class.before_filter
  f.call(job) if f
  job.delete if job.ybody[:delete_first]
  run_code(job)
  job.delete unless job.ybody[:delete_first]
rescue ActiveRecord::RecordNotFound => ex
  unless job.ybody[:delete_first]
    if job.age > 60
      job.delete # it's old; this error is most likely permanent
    else
      job.decay # it could be replication delay so retry quietly
    end
  end
end

#run_code(job) ⇒ Object



204
205
206
# File 'lib/async_observer/worker.rb', line 204

def run_code(job)
  eval(job.ybody[:code], @top_binding, "(beanstalk job #{job.id})", 1)
end

#run_other(job) ⇒ Object



212
213
214
215
# File 'lib/async_observer/worker.rb', line 212

def run_other(job)
  logger.info 'trying custom handler'
  self.class.handle.call(job)
end

#safe_dispatch(job) ⇒ Object



143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/async_observer/worker.rb', line 143

def safe_dispatch(job)
  logger.info "got #{job.inspect}:\n" + job.body
  job.stats.each do |k,v|
    logger.info "#{k}=#{v}"
  end
  begin
    return dispatch(job)
  rescue Interrupt => ex
    begin job.release rescue :ok end
    raise ex
  rescue Exception => ex
    handle_error(job, ex)
  ensure
    flush_logger
  end
end

#shutdownObject



81
82
83
# File 'lib/async_observer/worker.rb', line 81

def shutdown
  do_all_work
end

#startupObject



74
75
76
77
78
79
# File 'lib/async_observer/worker.rb', line 74

def startup
  tube = @options[:tube] || "default"
  logger.info "Using tube #{tube}"
  AsyncObserver::Queue.queue.watch(tube)
  flush_logger
end