Class: Backburner::Worker Abstract
- Inherits:
-
Object
- Object
- Backburner::Worker
- Defined in:
- lib/backburner/worker.rb
Overview
Subclass and override #process_tube_names, #prepare and #start to implement a custom Worker class.
Direct Known Subclasses
Backburner::Workers::Forking, Backburner::Workers::Simple, Backburner::Workers::Threading, Backburner::Workers::ThreadsOnFork
Class Attribute Summary collapse
Instance Attribute Summary collapse
-
#connection ⇒ Object
List of tube names to be watched and processed.
-
#tube_names ⇒ Object
List of tube names to be watched and processed.
Class Method Summary collapse
-
.enqueue(job_class, args = [], opts = {}) ⇒ Object
Enqueues a job to be processed later by a worker.
-
.start(tube_names = nil) ⇒ Object
Starts processing jobs with the specified tube_names.
Instance Method Summary collapse
- #handle_failure_for_job(job) ⇒ Object
-
#initialize(tube_names = nil) ⇒ Worker
constructor
Constructs a new worker for processing jobs within specified tubes.
-
#prepare ⇒ Object
abstract
Used to prepare the job queues before job processing is initiated.
-
#process_tube_names(tube_names) ⇒ Object
Processes tube_names given tube_names array.
-
#shutdown ⇒ Object
Triggers this worker to shutdown.
-
#start ⇒ Object
Starts processing ready jobs indefinitely.
-
#work_one_job(conn = connection, tube_name = nil) ⇒ Object
Performs a job by reserving a job from beanstalk and processing it.
Methods included from Logger
included, #job_started_at, #log_error, #log_info, #log_job_begin, #log_job_end, #logger
Methods included from Helpers
#classify, #constantize, #dasherize, #exception_message, #expand_tube_name, included, #queue_config, #resolve_max_job_retries, #resolve_priority, #resolve_respond_timeout, #resolve_retry_delay, #resolve_retry_delay_proc
Constructor Details
#initialize(tube_names = nil) ⇒ Worker
Constructs a new worker for processing jobs within specified tubes.
88 89 90 91 92 |
# File 'lib/backburner/worker.rb', line 88 def initialize(tube_names = nil) @connection = new_connection @tube_names = process_tube_names(tube_names) register_signal_handlers! end |
Class Attribute Details
.known_queue_classes ⇒ Object
15 16 17 |
# File 'lib/backburner/worker.rb', line 15 def known_queue_classes @known_queue_classes ||= [] end |
Instance Attribute Details
#connection ⇒ Object
List of tube names to be watched and processed
82 83 84 |
# File 'lib/backburner/worker.rb', line 82 def connection @connection end |
#tube_names ⇒ Object
List of tube names to be watched and processed
82 83 84 |
# File 'lib/backburner/worker.rb', line 82 def tube_names @tube_names end |
Class Method Details
.enqueue(job_class, args = [], opts = {}) ⇒ Object
Enqueues a job to be processed later by a worker. Options: ‘pri` (priority), `delay` (delay in secs), `ttr` (time to respond), `queue` (queue name)
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/backburner/worker.rb', line 27 def self.enqueue(job_class, args = [], opts = {}) = opts.dup # Invoke Procs if they are sent .each_key do |k| if [k].instance_of?(Proc) [k] = [k].call job_class, args end end [:shard_key] = [:shard_key].nil? ? 'X' : [:shard_key].to_s pri = resolve_priority([:pri] || job_class) delay = [0, [:delay].to_i].max ttr = resolve_respond_timeout([:ttr] || job_class) res = Backburner::Hooks.invoke_hook_events(job_class, :before_enqueue, *args) return nil unless res # stop if hook is false data = { class: job_class.name, args: args, ttr: ttr } queue = [:queue] && ([:queue].is_a?(Proc) ? [:queue].call(job_class) : [:queue]) begin response = nil connection = Backburner::Connection.new(Backburner.configuration.allq_url) connection.retryable do tube_name = (queue || job_class) serialized_data = Backburner.configuration.job_serializer_proc.call(data) send_data = { pri: pri, delay: delay, ttr: ttr } .merge!(send_data) puts "OPTIONS #{}" response = connection.put(tube_name, serialized_data, ) end return nil unless Backburner::Hooks.invoke_hook_events(job_class, :after_enqueue, *args) ensure connection.close if connection end response end |
.start(tube_names = nil) ⇒ Object
Starts processing jobs with the specified tube_names.
75 76 77 78 79 |
# File 'lib/backburner/worker.rb', line 75 def self.start(tube_names = nil) new(tube_names).start rescue SystemExit # do nothing end |
Instance Method Details
#handle_failure_for_job(job) ⇒ Object
185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 |
# File 'lib/backburner/worker.rb', line 185 def handle_failure_for_job(job) log_error "Handle failure for job" num_retries = job.task.releases max_job_retries = resolve_max_job_retries(job.job_class) retry_status = "failed: attempt #{num_retries+1} of #{max_job_retries+1}" if num_retries < max_job_retries # retry again retry_delay = resolve_retry_delay(job.job_class) delay = resolve_retry_delay_proc(job.job_class).call(retry_delay, num_retries) rescue retry_delay job.retry(num_retries + 1, delay) log_job_end(job.name, "#{retry_status}, retrying in #{delay}s") if job_started_at else # retries failed, bury job.bury log_job_end(job.name, "#{retry_status}, burying") if job_started_at end end |
#prepare ⇒ Object
Define this in your worker subclass
Used to prepare the job queues before job processing is initiated.
to be run once before processing. Recommended to watch tubes or print a message to the logs with ‘log_info’
114 115 116 |
# File 'lib/backburner/worker.rb', line 114 def prepare raise NotImplementedError end |
#process_tube_names(tube_names) ⇒ Object
This method can be overridden in inherited workers
Processes tube_names given tube_names array. Should return normalized tube_names as an array of strings.
to add more complex tube name processing.
135 136 137 |
# File 'lib/backburner/worker.rb', line 135 def process_tube_names(tube_names) compact_tube_names(tube_names) end |
#shutdown ⇒ Object
Triggers this worker to shutdown
119 120 121 122 123 124 |
# File 'lib/backburner/worker.rb', line 119 def shutdown Thread.new do log_info 'Worker exiting...' end Kernel.exit end |
#start ⇒ Object
Starts processing ready jobs indefinitely. Primary way to consume and process jobs in specified tubes.
100 101 102 |
# File 'lib/backburner/worker.rb', line 100 def start raise NotImplementedError end |
#work_one_job(conn = connection, tube_name = nil) ⇒ Object
Performs a job by reserving a job from beanstalk and processing it
144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 |
# File 'lib/backburner/worker.rb', line 144 def work_one_job(conn = connection, tube_name = nil) if tube_name.nil? log_error 'Sampling tube, this is bad practice for Allq' tube_name = @tube_names.sample end begin job = reserve_job(conn, tube_name) rescue Exception => e log_error "Exception: #{e.}" sleep(rand * 3) return end if job && job.body begin log_job_begin(job.name, job.args) job.process log_job_end(job.name) rescue Backburner::Job::JobFormatInvalid => e log_error (e) rescue StandardError => e # Error occurred processing job log_error "Exception during process" log_error (e) unless e.is_a?(Backburner::Job::RetryJob) unless job log_error 'Error occurred before we were able to assign a job. Giving up without retrying!' return end handle_failure_for_job(job) handle_error(e, job.name, job.args, job) end else sleep(rand * 3) end job end |