Class: Qless::Worker
- Inherits:
-
Object
- Object
- Qless::Worker
- Defined in:
- lib/qless/worker.rb
Overview
This is heavily inspired by Resque’s excellent worker: github.com/defunkt/resque/blob/v1.20.0/lib/resque/worker.rb
Instance Attribute Summary collapse
-
#job_reserver ⇒ Object
The object responsible for reserving jobs from the Qless server, using some reasonable strategy (e.g. round robin or ordered).
-
#output ⇒ Object
An IO-like object that logging output is sent to.
-
#run_as_single_process ⇒ Object
Whether the worker should run in a single prcoess i.e.
-
#term_timeout ⇒ Object
How long the child process is given to exit before forcibly killing it.
-
#verbose ⇒ Object
Whether the worker should log basic info to STDOUT.
-
#very_verbose ⇒ Object
Whether the worker should log lots of info to STDOUT.
Class Method Summary collapse
-
.start ⇒ Object
Starts a worker based on ENV vars.
Instance Method Summary collapse
-
#initialize(job_reserver, options = {}) ⇒ Worker
constructor
A new instance of Worker.
- #pause_processing ⇒ Object
- #paused? ⇒ Boolean
- #perform(job) ⇒ Object
- #perform_job_in_child_process(job) ⇒ Object
- #reserve_job ⇒ Object
- #shutdown ⇒ Object
- #shutdown! ⇒ Object
- #shutdown? ⇒ Boolean
- #unpause_processing ⇒ Object
- #work(interval = 5.0) ⇒ Object
Constructor Details
#initialize(job_reserver, options = {}) ⇒ Worker
Returns a new instance of Worker.
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# File 'lib/qless/worker.rb', line 13 def initialize(job_reserver, = {}) self.job_reserver = job_reserver @shutdown = @paused = false self.very_verbose = [:very_verbose] self.verbose = [:verbose] self.run_as_single_process = [:run_as_single_process] self.output = .fetch(:output, $stdout) self.term_timeout = .fetch(:term_timeout, 4.0) @backtrace_replacements = { Dir.pwd => '.' } @backtrace_replacements[ENV['GEM_HOME']] = '<GEM_HOME>' if ENV.has_key?('GEM_HOME') output.puts "\n\n\n" if verbose || very_verbose log "Instantiated Worker" end |
Instance Attribute Details
#job_reserver ⇒ Object
The object responsible for reserving jobs from the Qless server, using some reasonable strategy (e.g. round robin or ordered)
46 47 48 |
# File 'lib/qless/worker.rb', line 46 def job_reserver @job_reserver end |
#output ⇒ Object
An IO-like object that logging output is sent to. Defaults to $stdout.
42 43 44 |
# File 'lib/qless/worker.rb', line 42 def output @output end |
#run_as_single_process ⇒ Object
Whether the worker should run in a single prcoess i.e. not fork a child process to do the work This should only be true in a dev/test environment
38 39 40 |
# File 'lib/qless/worker.rb', line 38 def run_as_single_process @run_as_single_process end |
#term_timeout ⇒ Object
How long the child process is given to exit before forcibly killing it.
49 50 51 |
# File 'lib/qless/worker.rb', line 49 def term_timeout @term_timeout end |
#verbose ⇒ Object
Whether the worker should log basic info to STDOUT
30 31 32 |
# File 'lib/qless/worker.rb', line 30 def verbose @verbose end |
#very_verbose ⇒ Object
Whether the worker should log lots of info to STDOUT
33 34 35 |
# File 'lib/qless/worker.rb', line 33 def very_verbose @very_verbose end |
Class Method Details
.start ⇒ Object
Starts a worker based on ENV vars. Supported ENV vars:
- REDIS_URL=redis://host:port/db-num (the redis gem uses this automatically)
- QUEUES=high,medium,low or QUEUE=blah
- JOB_RESERVER=Ordered or JOB_RESERVER=RoundRobin
- INTERVAL=3.2
- VERBOSE=true (to enable logging)
- VVERBOSE=true (to enable very verbose logging)
- RUN_AS_SINGLE_PROCESS=true (false will fork children to do work, true will keep it single process)
This is designed to be called from a rake task
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/qless/worker.rb', line 60 def self.start client = Qless::Client.new queues = (ENV['QUEUES'] || ENV['QUEUE']).to_s.split(',').map { |q| client.queues[q.strip] } if queues.none? raise "No queues provided. You must pass QUEUE or QUEUES when starting a worker." end reserver = JobReservers.const_get(ENV.fetch('JOB_RESERVER', 'Ordered')).new(queues) interval = Float(ENV.fetch('INTERVAL', 5.0)) = {} [:verbose] = !!ENV['VERBOSE'] [:very_verbose] = !!ENV['VVERBOSE'] [:run_as_single_process] = !!ENV['RUN_AS_SINGLE_PROCESS'] new(reserver, ).work(interval) end |
Instance Method Details
#pause_processing ⇒ Object
159 160 161 162 163 |
# File 'lib/qless/worker.rb', line 159 def pause_processing log "USR2 received; pausing job processing" @paused = true procline "Paused -- #{@job_reserver.description}" end |
#paused? ⇒ Boolean
155 156 157 |
# File 'lib/qless/worker.rb', line 155 def paused? @paused end |
#perform(job) ⇒ Object
105 106 107 108 109 110 111 |
# File 'lib/qless/worker.rb', line 105 def perform(job) around_perform(job) rescue Exception => error fail_job(job, error, caller) else try_complete(job) end |
#perform_job_in_child_process(job) ⇒ Object
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
# File 'lib/qless/worker.rb', line 122 def perform_job_in_child_process(job) with_job(job) do @child = fork do job.reconnect_to_redis register_child_signal_handlers start_child_pub_sub_listener_for(job.client) procline "Processing #{job.description}" perform(job) exit! # don't run at_exit hooks end if @child wait_for_child else procline "Single processing #{job.description}" perform(job) end end end |
#reserve_job ⇒ Object
113 114 115 116 117 118 119 120 |
# File 'lib/qless/worker.rb', line 113 def reserve_job @job_reserver.reserve rescue Exception => error # We want workers to durably stay up, so we don't want errors # during job reserving (e.g. network timeouts, etc) to kill # the worker. log "Got an error while reserving a job: #{error.class}: #{error.}" end |
#shutdown ⇒ Object
142 143 144 |
# File 'lib/qless/worker.rb', line 142 def shutdown @shutdown = true end |
#shutdown! ⇒ Object
146 147 148 149 |
# File 'lib/qless/worker.rb', line 146 def shutdown! shutdown kill_child unless run_as_single_process end |
#shutdown? ⇒ Boolean
151 152 153 |
# File 'lib/qless/worker.rb', line 151 def shutdown? @shutdown end |
#unpause_processing ⇒ Object
165 166 167 168 |
# File 'lib/qless/worker.rb', line 165 def unpause_processing log "CONT received; resuming job processing" @paused = false end |
#work(interval = 5.0) ⇒ Object
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/qless/worker.rb', line 78 def work(interval = 5.0) procline "Starting #{@job_reserver.description}" register_parent_signal_handlers uniq_clients.each { |client| start_parent_pub_sub_listener_for(client) } loop do break if shutdown? if paused? sleep interval next end unless job = reserve_job break if interval.zero? procline "Waiting for #{@job_reserver.description}" log! "Sleeping for #{interval} seconds" sleep interval next end perform_job_in_child_process(job) end ensure # make sure the worker deregisters on shutdown deregister end |