Class: Backburner::Worker Abstract

Inherits:
Object
  • Object
show all
Includes:
Helpers, Logger
Defined in:
lib/backburner/worker.rb

Overview

This class is abstract.

Subclass and override #process_tube_names, #prepare and #start to implement a custom Worker class.

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Logger

included, #job_started_at, #log_error, #log_info, #log_job_begin, #log_job_end, #logger

Methods included from Helpers

#classify, #constantize, #dasherize, #exception_message, #expand_tube_name, included, #queue_config, #resolve_max_job_retries, #resolve_priority, #resolve_respond_timeout, #resolve_retry_delay, #resolve_retry_delay_proc

Constructor Details

#initialize(tube_names = nil) ⇒ Worker

Constructs a new worker for processing jobs within specified tubes.

Examples:

Worker.new(['test.job'])


88
89
90
91
92
# File 'lib/backburner/worker.rb', line 88

def initialize(tube_names = nil)
  @connection = new_connection
  @tube_names = process_tube_names(tube_names)
  register_signal_handlers!
end

Class Attribute Details

.known_queue_classesObject



15
16
17
# File 'lib/backburner/worker.rb', line 15

def known_queue_classes
  @known_queue_classes ||= []
end

Instance Attribute Details

#connectionObject

List of tube names to be watched and processed



82
83
84
# File 'lib/backburner/worker.rb', line 82

def connection
  @connection
end

#tube_namesObject

List of tube names to be watched and processed



82
83
84
# File 'lib/backburner/worker.rb', line 82

def tube_names
  @tube_names
end

Class Method Details

.enqueue(job_class, args = [], opts = {}) ⇒ Object

Enqueues a job to be processed later by a worker. Options: ‘pri` (priority), `delay` (delay in secs), `ttr` (time to respond), `queue` (queue name)

Examples:

Backburner::Worker.enqueue NewsletterSender, [self.id, user.id], :ttr => 1000

Raises:

  • (Beaneater::NotConnected)

    If beanstalk fails to connect.



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/backburner/worker.rb', line 27

def self.enqueue(job_class, args = [], opts = {})
  options = opts.dup

  # Invoke Procs if they are sent
  options.each_key do |k|
    if options[k].instance_of?(Proc)
      options[k] = options[k].call job_class, args
    end
  end
  options[:shard_key] = options[:shard_key].nil? ? 'X' : options[:shard_key].to_s
  pri   = resolve_priority(options[:pri] || job_class)
  delay = [0, options[:delay].to_i].max
  ttr   = resolve_respond_timeout(options[:ttr] || job_class)
  res   = Backburner::Hooks.invoke_hook_events(job_class, :before_enqueue, *args)

  return nil unless res # stop if hook is false

  data = { class: job_class.name, args: args, ttr: ttr }
  queue = options[:queue] && (options[:queue].is_a?(Proc) ? options[:queue].call(job_class) : options[:queue])

  begin
    response = nil
    connection = Backburner::Connection.new(Backburner.configuration.allq_url)
    connection.retryable do
      tube_name = expand_tube_name(queue || job_class)
      serialized_data = Backburner.configuration.job_serializer_proc.call(data)
      send_data = {
        pri: pri,
        delay: delay,
        ttr: ttr
      }
      options.merge!(send_data)
      puts "OPTIONS #{options}"
      response = connection.put(tube_name, serialized_data, options)
    end
    return nil unless Backburner::Hooks.invoke_hook_events(job_class, :after_enqueue, *args)
  ensure
    connection.close if connection
  end

  response
end

.start(tube_names = nil) ⇒ Object

Starts processing jobs with the specified tube_names.

Examples:

Backburner::Worker.start(["foo.tube.name"])


75
76
77
78
79
# File 'lib/backburner/worker.rb', line 75

def self.start(tube_names = nil)
  new(tube_names).start
rescue SystemExit
  # do nothing
end

Instance Method Details

#handle_failure_for_job(job) ⇒ Object



185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
# File 'lib/backburner/worker.rb', line 185

def handle_failure_for_job(job)
  log_error "Handle failure for job"

  num_retries = job.task.releases
  max_job_retries = resolve_max_job_retries(job.job_class)

  retry_status = "failed: attempt #{num_retries+1} of #{max_job_retries+1}"

  if num_retries < max_job_retries # retry again
    retry_delay = resolve_retry_delay(job.job_class)
    delay = resolve_retry_delay_proc(job.job_class).call(retry_delay, num_retries) rescue retry_delay
    job.retry(num_retries + 1, delay)
    log_job_end(job.name, "#{retry_status}, retrying in #{delay}s") if job_started_at
  else # retries failed, bury
    job.bury
    log_job_end(job.name, "#{retry_status}, burying") if job_started_at
  end

end

#prepareObject

This method is abstract.

Define this in your worker subclass

Used to prepare the job queues before job processing is initiated.

to be run once before processing. Recommended to watch tubes or print a message to the logs with ‘log_info’

Examples:

@worker.prepare

Raises:

  • (Beaneater::NotConnected)

    If beanstalk fails to connect.



114
115
116
# File 'lib/backburner/worker.rb', line 114

def prepare
  raise NotImplementedError
end

#process_tube_names(tube_names) ⇒ Object

Note:

This method can be overridden in inherited workers

Processes tube_names given tube_names array. Should return normalized tube_names as an array of strings.

to add more complex tube name processing.

Examples:

process_tube_names([['foo'], ['bar']])
=> ['foo', 'bar', 'baz']


135
136
137
# File 'lib/backburner/worker.rb', line 135

def process_tube_names(tube_names)
  compact_tube_names(tube_names)
end

#shutdownObject

Triggers this worker to shutdown



119
120
121
122
123
124
# File 'lib/backburner/worker.rb', line 119

def shutdown
  Thread.new do
    log_info 'Worker exiting...'
  end
  Kernel.exit
end

#startObject

Starts processing ready jobs indefinitely. Primary way to consume and process jobs in specified tubes.

Examples:

@worker.start

Raises:

  • (NotImplementedError)


100
101
102
# File 'lib/backburner/worker.rb', line 100

def start
  raise NotImplementedError
end

#work_one_job(conn = connection, tube_name = nil) ⇒ Object

Performs a job by reserving a job from beanstalk and processing it

Examples:

@worker.work_one_job

Raises:

  • (Beaneater::NotConnected)

    If beanstalk fails to connect multiple times.



144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/backburner/worker.rb', line 144

def work_one_job(conn = connection, tube_name = nil)
  if tube_name.nil?
    log_error 'Sampling tube, this is bad practice for Allq'
    tube_name = @tube_names.sample
  end

  begin
    job = reserve_job(conn, tube_name)
  rescue Exception => e
    log_error "Exception: #{e.full_message}"
    sleep(rand * 3)
    return
  end

  if job && job.body
    begin
      log_job_begin(job.name, job.args)
      job.process
      log_job_end(job.name)
    rescue Backburner::Job::JobFormatInvalid => e
      log_error exception_message(e)
    rescue StandardError => e # Error occurred processing job
      log_error "Exception during process"
      log_error exception_message(e) unless e.is_a?(Backburner::Job::RetryJob)

      unless job
        log_error 'Error occurred before we were able to assign a job. Giving up without retrying!'
        return
      end

      handle_failure_for_job(job)

      handle_error(e, job.name, job.args, job)
    end
  else
    sleep(rand * 3)
  end
  job
end