Class: Beaneater::Jobs

Inherits:
PoolCommand show all
Defined in:
lib/beaneater/job/collection.rb

Overview

Represents collection of job-related commands.

Constant Summary collapse

MAX_RETRIES =

Number of retries to process a job.

3
RELEASE_DELAY =

Delay in seconds before to make job ready again.

1
RESERVE_TIMEOUT =

Number of seconds to wait for a job before checking a different server.

nil

Instance Attribute Summary collapse

Attributes inherited from PoolCommand

#pool

Instance Method Summary collapse

Methods inherited from PoolCommand

#combine_stats, #initialize, #method_missing, #sum_items, #transmit_to_all

Constructor Details

This class inherits a constructor from Beaneater::PoolCommand

Dynamic Method Handling

This class handles dynamic methods through the method_missing method in the class Beaneater::PoolCommand

Instance Attribute Details

#processorsObject

Returns the value of attribute processors


11
12
13
# File 'lib/beaneater/job/collection.rb', line 11

def processors
  @processors
end

Instance Method Details

#find(id) ⇒ Beaneater::Job Also known as: peek, []

Peek (or find) first job from beanstalkd pool.

Examples:

@beaneater_pool.jobs[123] # => <Beaneater::Job>
@beaneater_pool.jobs.find(123) # => <Beaneater::Job>
@beaneater_pool.jobs.peek(123) # => <Beaneater::Job>

Parameters:

  • id (Integer)

    Job id to find

Returns:


32
33
34
35
36
37
# File 'lib/beaneater/job/collection.rb', line 32

def find(id)
  res = transmit_until_res("peek #{id}", :status => "FOUND")
  Job.new(res)
rescue Beaneater::NotFoundError
  nil
end

#find_all(id) ⇒ Array<Beaneater::Job>

Find all jobs with specified id fromm all beanstalkd servers in pool.

Examples:

@beaneater_pool.jobs.find_all(123) # => [<Beaneater::Job>, <Beaneater::Job>]

Parameters:

  • id (Integer)

    Job id to find

Returns:


49
50
51
52
53
54
# File 'lib/beaneater/job/collection.rb', line 49

def find_all(id)
  res = transmit_to_all("peek #{id}")
  res.compact.map { |r| Job.new(r) }
rescue Beaneater::NotFoundError
  []
end

#process!(options = {}) ⇒ Object

Watch, reserve, process and delete or bury or release jobs.

Parameters:

  • options (Hash{String => Integer}) (defaults to: {})

    Settings for processing

Options Hash (options):

  • release_delay (Integer)

    Delay in seconds before to make job ready again

  • reserve_timeout (Integer)

    Number of seconds to wait for a job before checking a different server


88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/beaneater/job/collection.rb', line 88

def process!(options={})
  release_delay = options.delete(:release_delay) || RELEASE_DELAY
  reserve_timeout = options.delete(:reserve_timeout) || RESERVE_TIMEOUT
  tubes.watch!(*processors.keys)
  loop do
    begin
      job = tubes.reserve(reserve_timeout)
      processor = processors[job.tube]
      begin
        processor[:block].call(job)
        job.delete
      rescue *processor[:retry_on]
        job.release(:delay => release_delay) if job.stats.releases < processor[:max_retries]
      end
    rescue AbortProcessingError
      break
    rescue Beaneater::JobNotReserved, Beaneater::NotFoundError, Beaneater::TimedOutError
      retry
    rescue StandardError # handles unspecified errors
      job.bury if job
    ensure # bury if still reserved
      job.bury if job && job.exists? && job.reserved?
    end
  end
end

#register(tube_name, options = {}, &block) ⇒ Object

Register a processor to handle beanstalkd job on particular tube.

Examples:

@beanstalk.jobs.register('some-tube', :retry_on => [SomeError]) do |job|
  do_something(job)
end

@beanstalk.jobs.register('other-tube') do |job|
  do_something_else(job)
end

Parameters:

  • tube_name (String)

    Tube name

  • options (Hash{String=>RuntimeError}) (defaults to: {})

    settings for processor

  • block (Proc)

    Process beanstalkd job

Options Hash (options):

  • max_retries (Integer)

    Number of retries to process a job

  • retry_on (Array<RuntimeError>)

    Collection of errors to rescue and re-run processor


74
75
76
77
78
79
# File 'lib/beaneater/job/collection.rb', line 74

def register(tube_name, options={}, &block)
  @processors ||= {}
  max_retries = options[:max_retries] || MAX_RETRIES
  retry_on = Array(options[:retry_on])
  @processors[tube_name.to_s] = { :block => block, :retry_on => retry_on, :max_retries => max_retries }
end