Method: Beaneater::Jobs#process!

Defined in:
lib/beaneater/job/collection.rb

#process!(options = {}) ⇒ Object

Watch, reserve, process and delete or bury or release jobs.

Parameters:

  • options (Hash{String => Integer}) (defaults to: {})

    Settings for processing

Options Hash (options):

  • release_delay (Integer)

    Delay in seconds before to make job ready again

  • reserve_timeout (Integer)

    Number of seconds to wait for a job before checking a different server



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/beaneater/job/collection.rb', line 108

def process!(options={})
  release_delay = options.delete(:release_delay) || RELEASE_DELAY
  reserve_timeout = options.delete(:reserve_timeout) || RESERVE_TIMEOUT
  client.tubes.watch!(*processors.keys)
  while !stop? do
    begin
      @current_job = client.tubes.reserve(reserve_timeout)
      processor = processors[@current_job.tube]
      begin
        processor[:block].call(@current_job)
        @current_job.delete
      rescue *processor[:retry_on]
        if @current_job.stats.releases < processor[:max_retries]
          @current_job.release(:delay => release_delay)
        end
      end
    rescue AbortProcessingError
      break
    rescue Beaneater::JobNotReserved, Beaneater::NotFoundError, Beaneater::TimedOutError
      retry
    rescue StandardError # handles unspecified errors
      @current_job.bury if @current_job
    ensure # bury if still reserved
      @current_job.bury if @current_job && @current_job.exists? && @current_job.reserved?
      @current_job = nil
    end
  end
end