Class: Bluth::Worker
- Inherits:
-
Storable
- Object
- Storable
- Bluth::Worker
- Includes:
- WorkerBase, Daemonizable, Familia, Familia::Stamps, Logging
- Defined in:
- lib/bluth/worker.rb
Direct Known Subclasses
Class Attribute Summary collapse
-
.interval ⇒ Object
Returns the value of attribute interval.
-
.reconnect_delay ⇒ Object
Returns the value of attribute reconnect_delay.
-
.reconnect_tries ⇒ Object
Returns the value of attribute reconnect_tries.
Attributes included from Daemonizable
Instance Method Summary collapse
Methods included from Daemonizable
#change_privilege, #daemonize, included, #on_restart, #pid, #restart
Methods included from Logging
#debug, debug, debug?, #log, log, #log_error, log_error, #silent, #silent=, silent?, #trace, trace, trace?
Methods included from WorkerBase
#current_job, included, #init, #kill, #name, #wid, #working!
Class Attribute Details
.interval ⇒ Object
Returns the value of attribute interval.
136 137 138 |
# File 'lib/bluth/worker.rb', line 136 def interval @interval end |
.reconnect_delay ⇒ Object
Returns the value of attribute reconnect_delay.
136 137 138 |
# File 'lib/bluth/worker.rb', line 136 def reconnect_delay @reconnect_delay end |
.reconnect_tries ⇒ Object
Returns the value of attribute reconnect_tries.
136 137 138 |
# File 'lib/bluth/worker.rb', line 136 def reconnect_tries @reconnect_tries end |
Instance Method Details
#carefully ⇒ Object
164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 |
# File 'lib/bluth/worker.rb', line 164 def carefully begin yield rescue Errno::ECONNREFUSED => ex unless Bluth::Worker.reconnect! self.index self.class.onerror.call ex, self if self.class.onerror Familia.info "Reconnect failed :[" end rescue => ex if self.class.onerror self.class.onerror.call ex, self else Familia.info ex. Familia.ld ex.backtrace problem! end #if problem > 5 # ## TODO: SEND EMAIL # task.unschedule unless task.nil? # Kill this worker b/c something is clearly wrong # destroy! # EM.stop # exit 1 #end end end |
#run ⇒ Object
206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 |
# File 'lib/bluth/worker.rb', line 206 def run begin @process_id = $$ @scheduler = Rufus::Scheduler.start_new Bluth.connect self.class.runblock :onstart Familia.info "Setting interval: #{Worker.interval} sec (queuetimeout: #{Bluth.queuetimeout})" Familia.reconnect_all! # Need to reconnect after daemonize save Signal.trap("USR1") do Familia.debug = (Familia.debug == false) Familia.info "Debugging is #{Familia.debug ? 'enabled' : 'disabled'}" end @usr2_reduce = true Signal.trap("USR2") do @usr2_reduce = false if Bluth.queuetimeout <= 2 @usr2_reduce = true if Bluth.queuetimeout >= 60 if @usr2_reduce #Worker.interval /= 2.0 Bluth.queuetimeout /= 2 else #Worker.interval *= 2.0 Bluth.queuetimeout *= 2 end Familia.info "Set intervals: #{Worker.interval} sec / #{Bluth.queuetimeout} sec" end ## TODO: on_the_minute = Time.at(BS.quantize(Stella.now, 1.minute)+1.minute).utc ## first_at ## @option.ontheminute @task = @scheduler.every Worker.interval, :blocking => true, :first_in => '2s' do |task| carefully do Familia.ld "#{$$} TICK @ #{Time.now.utc}" if Familia.debug? find_gob task end end @scheduler.join rescue Interrupt => ex puts <<-EOS.gsub(/(?:^|\n)\s*/, "\n") Exiting... (You may need to wait up to #{Bluth.queuetimeout} seconds for this worker to exit cleanly.) EOS # We reconnect to the queue in case we're currently # waiting on a brpop (blocking pop) timeout. destroy! ensure self.class.runblock :onexit end end |