Module: Stalker

Extended by:
Stalker
Included in:
Stalker
Defined in:
lib/stalker.rb

Defined Under Namespace

Classes: BadURL, JobTimeout, NoJobsDefined, NoSuchJob

Instance Method Summary collapse

Instance Method Details

#all_jobsObject



182
183
184
# File 'lib/stalker.rb', line 182

def all_jobs
  @@handlers.keys
end

#beanstalkObject



149
150
151
# File 'lib/stalker.rb', line 149

def beanstalk
  @@beanstalk ||= Beaneater::Pool.new(beanstalk_addresses)
end

#beanstalk_addressesObject



160
161
162
163
# File 'lib/stalker.rb', line 160

def beanstalk_addresses
  uris = beanstalk_url.split(/[\s,]+/)
  uris.map {|uri| beanstalk_host_and_port(uri)}
end

#beanstalk_host_and_port(uri_string) ⇒ Object

Raises:



165
166
167
168
169
# File 'lib/stalker.rb', line 165

def beanstalk_host_and_port(uri_string)
  uri = URI.parse(uri_string)
  raise(BadURL, uri_string) if uri.scheme != 'beanstalk'
  "#{uri.host}:#{uri.port || 11300}"
end

#beanstalk_urlObject



153
154
155
156
# File 'lib/stalker.rb', line 153

def beanstalk_url
  return @@url if defined?(@@url) and @@url
  ENV['BEANSTALK_URL'] || 'beanstalk://localhost/'
end

#before(&block) ⇒ Object



36
37
38
39
# File 'lib/stalker.rb', line 36

def before(&block)
  @@before_handlers ||= []
  @@before_handlers << block
end

#clear!Object



190
191
192
193
194
# File 'lib/stalker.rb', line 190

def clear!
  @@handlers = nil
  @@before_handlers = nil
  @@error_handler = nil
end

#connect(url) ⇒ Object



10
11
12
13
# File 'lib/stalker.rb', line 10

def connect(url)
  @@url = url
  beanstalk
end

#enqueue(job, args = {}, opts = {}) ⇒ Object



15
16
17
18
19
20
21
22
23
24
# File 'lib/stalker.rb', line 15

def enqueue(job, args={}, opts={})
  pri   = opts[:pri]   || 65536
  delay = [0, opts[:delay].to_i].max  
  ttr   = opts[:ttr]   || 120
  tube  = beanstalk.tubes[job]
  job = tube.put [ job, args ].to_json, pri:pri, delay:delay, ttr:ttr
  job[:id]
rescue Beaneater::NotConnected => e
  failed_connection(e)
end

#error(&blk) ⇒ Object



41
42
43
# File 'lib/stalker.rb', line 41

def error(&blk)
  @@error_handler = blk
end

#error_handlerObject



186
187
188
# File 'lib/stalker.rb', line 186

def error_handler
  @@error_handler
end

#exception_message(e) ⇒ Object



171
172
173
174
175
176
177
178
179
180
# File 'lib/stalker.rb', line 171

def exception_message(e)
  msg = [ "Exception #{e.class} -> #{e.message}" ]

  base = File.expand_path(Dir.pwd) + '/'
  e.backtrace.each do |t|
    msg << "   #{File.expand_path(t).gsub(/#{base}/, '')}"
  end

  msg.join("\n")
end

#failed_connection(e) ⇒ Object



115
116
117
118
119
120
# File 'lib/stalker.rb', line 115

def failed_connection(e)
  log_error exception_message(e)
  log_error "*** Failed connection to #{beanstalk_url}"
  log_error "*** Check that beanstalkd is running (or set a different BEANSTALK_URL)"
  exit 1
end

#job(j, &block) ⇒ Object



26
27
28
29
# File 'lib/stalker.rb', line 26

def job(j, &block)
  @@handlers ||= {}
  @@handlers[j] = block
end

#log(msg) ⇒ Object



141
142
143
# File 'lib/stalker.rb', line 141

def log(msg)
  puts msg
end

#log_error(msg) ⇒ Object



145
146
147
# File 'lib/stalker.rb', line 145

def log_error(msg)
  STDERR.puts msg
end

#log_job_begin(name, args) ⇒ Object



122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/stalker.rb', line 122

def log_job_begin(name, args)
  args_flat = unless args.empty?
    '(' + args.inject([]) do |accum, (key,value)|
      accum << "#{key}=#{value}"
    end.join(' ') + ')'
  else
    ''
  end

  log [ "Working", name, args_flat ].join(' ')
  @job_begun = Time.now
end

#log_job_end(name, failed = false) ⇒ Object



135
136
137
138
139
# File 'lib/stalker.rb', line 135

def log_job_end(name, failed=false)
  ellapsed = Time.now - @job_begun
  ms = (ellapsed.to_f * 1000).to_i
  log "Finished #{name} in #{ms}ms #{failed ? ' (failed)' : ''}"
end

#prep(jobs = nil) ⇒ Object



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/stalker.rb', line 48

def prep(jobs=nil)
  raise NoJobsDefined unless defined?(@@handlers)
  @@error_handler = nil unless defined?(@@error_handler)

  jobs ||= all_jobs

  jobs.each do |job|
    raise(NoSuchJob, job) unless @@handlers[job]
  end

  log "Working #{jobs.size} jobs: [ #{jobs.join(' ')} ]"

  jobs.each { |job| beanstalk.tubes.watch(job) }

  beanstalk.tubes.watched.each do |tube|
    beanstalk.tubes.ignore(tube.name) unless jobs.include?(tube.name)
  end
rescue Beaneater::NotConnected => e
  failed_connection(e)
end

#status(job_id) ⇒ Object



31
32
33
34
# File 'lib/stalker.rb', line 31

def status(job_id)
  job = beanstalk.jobs[job_id]
  job.stats unless job.nil?
end

#work(jobs = nil) ⇒ Object



69
70
71
72
# File 'lib/stalker.rb', line 69

def work(jobs=nil)
  prep(jobs)
  loop { work_one_job }
end

#work_one_jobObject



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/stalker.rb', line 76

def work_one_job
  job = beanstalk.tubes.reserve
  name, args = JSON.parse job.body
  log_job_begin(name, args)
  handler = @@handlers[name]
  raise(NoSuchJob, name) unless handler

  begin
    Timeout::timeout(job.ttr - 1) do
      if defined? @@before_handlers and @@before_handlers.respond_to? :each
        @@before_handlers.each do |block|
          block.call(name)
        end
      end
      handler.call(args)
    end
  rescue Timeout::Error
    raise JobTimeout, "#{name} hit #{job.ttr-1}s timeout"
  end

  job.delete
  log_job_end(name)
rescue Beaneater::NotConnected => e
  failed_connection(e)
rescue SystemExit
  raise
rescue => e
  log_error exception_message(e)
  job.bury rescue nil
log_job_end(name, 'failed') if @job_begun
  if error_handler
    if error_handler.arity == 1
      error_handler.call(e)
    else
      error_handler.call(e, name, args)
    end
  end
end