Module: Beanworker::Worker

Defined in:
lib/beanworker/worker.rb

Instance Method Summary collapse

Instance Method Details

#fork_default(val) ⇒ Object



37
38
39
# File 'lib/beanworker/worker.rb', line 37

def fork_default(val)
  @need_fork = val
end

#get_one_job(connection) ⇒ Object



41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/beanworker/worker.rb', line 41

def get_one_job(connection)
  job = connection.reserve
  name, args = job.stats['tube'].gsub('.', '_'), [job.ybody]
  need_fork = args.delete('__fork__')
  work_job(name, job.ttr, args, need_fork.nil? ? @need_fork : need_fork)
  job.delete
rescue SystemExit
  raise
rescue => e
  logger.error e.backtrace.unshift(e.message).join("\n")
  job.bury rescue nil
end

#loggerObject



81
82
83
# File 'lib/beanworker/worker.rb', line 81

def logger
  @logger ||= Logger.new(STDOUT)
end

#logger=(l) ⇒ Object



33
34
35
# File 'lib/beanworker/worker.rb', line 33

def logger=(l)
  @logger = l
end

#make_first_schedule(frequency, at) ⇒ Object



72
73
74
75
76
77
78
79
# File 'lib/beanworker/worker.rb', line 72

def make_first_schedule(frequency, at)
  if at && (frequency % (24*3600) == 0)
    t = Time.parse at
    t > Time.now ? t : (t + 24*3600)
  else
    Time.now + frequency
  end
end

#perform(num, *tubes) ⇒ Object



8
9
10
11
12
13
14
15
16
17
18
19
# File 'lib/beanworker/worker.rb', line 8

def perform(num, *tubes)
  num.times do
    Thread.new do
      connection = Beanqueue.connect Beanworker.connection_config
      tubes.each { |tube| connection.watch(tube.to_s.gsub('_', '.')) }
      logger.info "Listening: #{tubes.inspect}"
      loop do
        get_one_job connection
      end
    end
  end
end

#schedule(name, frequency, opts = {}, *args) ⇒ Object



21
22
23
24
25
26
27
28
29
30
31
# File 'lib/beanworker/worker.rb', line 21

def schedule(name, frequency, opts={}, *args)
  at = make_first_schedule(frequency, opts[:at])
  timeout_secs = [opts[:timeout] || frequency, frequency-5].min
  Thread.new do
    loop do
      sleep(at - Time.now)
      work_job(name, timeout_secs, args, @need_fork)
      at += frequency
    end
  end
end

#work_job(name, ttr, args, need_fork = false) ⇒ Object



54
55
56
57
58
59
60
61
62
63
64
# File 'lib/beanworker/worker.rb', line 54

def work_job(name, ttr, args, need_fork=false)
  logger.info "Starting #{name} with #{args.inspect}"
  if need_fork
    Process.wait(Process.fork do
      work_with_timeout(name, ttr, args)
    end)
  else
    work_with_timeout(name, ttr, args)
  end
  logger.info "Finished #{name} with #{args.inspect}"
end

#work_with_timeout(name, ttr, args) ⇒ Object



66
67
68
69
70
# File 'lib/beanworker/worker.rb', line 66

def work_with_timeout(name, ttr, args)
  Timeout::timeout(ttr - 1) do
    self.send(name, *args)
  end
end