Module: Beanworker::Worker
- Defined in:
- lib/beanworker/worker.rb
Instance Method Summary collapse
- #fork_default(val) ⇒ Object
- #get_one_job(connection) ⇒ Object
- #logger ⇒ Object
- #logger=(l) ⇒ Object
- #make_first_schedule(frequency, at) ⇒ Object
- #perform(num, *tubes) ⇒ Object
- #schedule(name, frequency, opts = {}, *args) ⇒ Object
- #work_job(name, ttr, args, need_fork = false) ⇒ Object
- #work_with_timeout(name, ttr, args) ⇒ Object
Instance Method Details
#fork_default(val) ⇒ Object
37 38 39 |
# File 'lib/beanworker/worker.rb', line 37 def fork_default(val) @need_fork = val end |
#get_one_job(connection) ⇒ Object
41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/beanworker/worker.rb', line 41 def get_one_job(connection) job = connection.reserve name, args = job.stats['tube'].gsub('.', '_'), [job.ybody] need_fork = args.delete('__fork__') work_job(name, job.ttr, args, need_fork.nil? ? @need_fork : need_fork) job.delete rescue SystemExit raise rescue => e logger.error e.backtrace.unshift(e.).join("\n") job.bury rescue nil end |
#logger ⇒ Object
81 82 83 |
# File 'lib/beanworker/worker.rb', line 81 def logger @logger ||= Logger.new(STDOUT) end |
#logger=(l) ⇒ Object
33 34 35 |
# File 'lib/beanworker/worker.rb', line 33 def logger=(l) @logger = l end |
#make_first_schedule(frequency, at) ⇒ Object
72 73 74 75 76 77 78 79 |
# File 'lib/beanworker/worker.rb', line 72 def make_first_schedule(frequency, at) if at && (frequency % (24*3600) == 0) t = Time.parse at t > Time.now ? t : (t + 24*3600) else Time.now + frequency end end |
#perform(num, *tubes) ⇒ Object
8 9 10 11 12 13 14 15 16 17 18 19 |
# File 'lib/beanworker/worker.rb', line 8 def perform(num, *tubes) num.times do Thread.new do connection = Beanqueue.connect Beanworker.connection_config tubes.each { |tube| connection.watch(tube.to_s.gsub('_', '.')) } logger.info "Listening: #{tubes.inspect}" loop do get_one_job connection end end end end |
#schedule(name, frequency, opts = {}, *args) ⇒ Object
21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/beanworker/worker.rb', line 21 def schedule(name, frequency, opts={}, *args) at = make_first_schedule(frequency, opts[:at]) timeout_secs = [opts[:timeout] || frequency, frequency-5].min Thread.new do loop do sleep(at - Time.now) work_job(name, timeout_secs, args, @need_fork) at += frequency end end end |
#work_job(name, ttr, args, need_fork = false) ⇒ Object
54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/beanworker/worker.rb', line 54 def work_job(name, ttr, args, need_fork=false) logger.info "Starting #{name} with #{args.inspect}" if need_fork Process.wait(Process.fork do work_with_timeout(name, ttr, args) end) else work_with_timeout(name, ttr, args) end logger.info "Finished #{name} with #{args.inspect}" end |
#work_with_timeout(name, ttr, args) ⇒ Object
66 67 68 69 70 |
# File 'lib/beanworker/worker.rb', line 66 def work_with_timeout(name, ttr, args) Timeout::timeout(ttr - 1) do self.send(name, *args) end end |