Class: Bluth::Worker

Inherits:
Storable
  • Object
show all
Includes:
WorkerBase, Daemonizable, Familia, Familia::Stamps, Logging
Defined in:
lib/bluth/worker.rb

Direct Known Subclasses

ExampleWorker

Class Attribute Summary collapse

Attributes included from Daemonizable

#log_file, #pid_file

Instance Method Summary collapse

Methods included from Daemonizable

#change_privilege, #daemonize, included, #on_restart, #pid, #restart

Methods included from Logging

#debug, debug, debug?, #log, log, #log_error, log_error, #silent, #silent=, silent?, #trace, trace, trace?

Methods included from WorkerBase

#current_job, included, #init, #kill, #name, #wid, #working!

Class Attribute Details

.intervalObject

Returns the value of attribute interval.



136
137
138
# File 'lib/bluth/worker.rb', line 136

def interval
  @interval
end

.reconnect_delayObject

Returns the value of attribute reconnect_delay.



136
137
138
# File 'lib/bluth/worker.rb', line 136

def reconnect_delay
  @reconnect_delay
end

.reconnect_triesObject

Returns the value of attribute reconnect_tries.



136
137
138
# File 'lib/bluth/worker.rb', line 136

def reconnect_tries
  @reconnect_tries
end

Instance Method Details

#carefullyObject



164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
# File 'lib/bluth/worker.rb', line 164

def carefully
  begin
    yield
  rescue Errno::ECONNREFUSED => ex
    
    unless Bluth::Worker.reconnect! self.index
      self.class.onerror.call ex, self if self.class.onerror 
      Familia.info "Reconnect failed :["
    end
  
  rescue => ex
    if self.class.onerror 
      self.class.onerror.call ex, self
    else
      Familia.info ex.message
      Familia.ld ex.backtrace
      problem!
    end
    #if problem > 5
    #  ## TODO: SEND EMAIL
    #  task.unschedule unless task.nil? # Kill this worker b/c something is clearly wrong
    #  destroy!
    #  EM.stop
    #  exit 1
    #end
  end
end

#runObject



206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
# File 'lib/bluth/worker.rb', line 206

def run
  begin
    @process_id = $$
    @scheduler = Rufus::Scheduler.start_new
    Bluth.connect
    self.class.runblock :onstart
    Familia.info "Setting interval: #{Worker.interval} sec (queuetimeout: #{Bluth.queuetimeout})"
    Familia.reconnect_all! # Need to reconnect after daemonize
    save
    Signal.trap("USR1") do
      Familia.debug = (Familia.debug == false)
      Familia.info "Debugging is #{Familia.debug ? 'enabled' : 'disabled'}"
    end
    @usr2_reduce = true
    Signal.trap("USR2") do
      @usr2_reduce = false if Bluth.queuetimeout <= 2
      @usr2_reduce = true if Bluth.queuetimeout >= 60
      if @usr2_reduce
        #Worker.interval /= 2.0
        Bluth.queuetimeout /= 2
      else
        #Worker.interval *= 2.0
        Bluth.queuetimeout *= 2
      end
      Familia.info "Set intervals: #{Worker.interval} sec / #{Bluth.queuetimeout} sec"
    end
    ## TODO: on_the_minute = Time.at(BS.quantize(Stella.now, 1.minute)+1.minute).utc  ## first_at
    ## @option.ontheminute
    @task = @scheduler.every Worker.interval, :blocking => true, :first_in => '2s' do |task|
      carefully do
        Familia.ld "#{$$} TICK @ #{Time.now.utc}" if Familia.debug?
        find_gob task
      end
    end
    @scheduler.join 
    
  rescue Interrupt => ex
    puts <<-EOS.gsub(/(?:^|\n)\s*/, "\n")
      Exiting...
      (You may need to wait up to #{Bluth.queuetimeout} seconds
      for this worker to exit cleanly.)
    EOS
    # We reconnect to the queue in case we're currently
    # waiting on a brpop (blocking pop) timeout.
    destroy!
  ensure
    self.class.runblock :onexit
  end
  
end

#run!Object



192
193
194
195
196
197
198
199
200
201
202
203
204
# File 'lib/bluth/worker.rb', line 192

def run!
  begin
    Bluth.connect
    self.class.runblock :onstart
    carefully do
      find_gob
    end
  rescue Interrupt => ex
    puts $/, "Exiting..."
    self.class.runblock :onexit
    destroy!
  end
end