Class: Beaneater::Jobs
- Inherits:
-
Object
- Object
- Beaneater::Jobs
- Defined in:
- lib/beaneater/job/collection.rb
Overview
Represents collection of job-related commands.
Constant Summary collapse
- MAX_RETRIES =
Number of retries to process a job.
3
- RELEASE_DELAY =
Delay in seconds before to make job ready again.
1
- RESERVE_TIMEOUT =
Number of seconds to wait for a job before checking a different server.
nil
Instance Attribute Summary collapse
-
#client ⇒ Beaneater
Returns the client instance.
-
#current_job ⇒ Object
Returns the value of attribute current_job.
-
#processors ⇒ Array<Proc>
Returns Collection of proc to handle beanstalkd jobs.
Instance Method Summary collapse
-
#find(id) ⇒ Beaneater::Job
(also: #peek, #[])
Peek (or find) job by id from beanstalkd.
-
#initialize(client) ⇒ Jobs
constructor
Creates new jobs instance.
-
#process!(options = {}) ⇒ Object
Watch, reserve, process and delete or bury or release jobs.
-
#register(tube_name, options = {}, &block) ⇒ Object
Register a processor to handle beanstalkd job on particular tube.
-
#stop! ⇒ Object
Sets flag to indicate that process loop should stop after current job.
-
#stop? ⇒ Boolean
Returns whether the process loop should stop.
-
#transmit(command, options = {}) ⇒ Object
Delegates transmit to the connection object.
Constructor Details
#initialize(client) ⇒ Jobs
Creates new jobs instance.
32 33 34 |
# File 'lib/beaneater/job/collection.rb', line 32 def initialize(client) @client = client end |
Instance Attribute Details
#client ⇒ Beaneater
Returns the client instance
15 |
# File 'lib/beaneater/job/collection.rb', line 15 attr_reader :processors, :client, :current_job |
#current_job ⇒ Object
Returns the value of attribute current_job.
15 |
# File 'lib/beaneater/job/collection.rb', line 15 attr_reader :processors, :client, :current_job |
#processors ⇒ Array<Proc>
Returns Collection of proc to handle beanstalkd jobs
15 16 17 |
# File 'lib/beaneater/job/collection.rb', line 15 def processors @processors end |
Instance Method Details
#find(id) ⇒ Beaneater::Job Also known as: peek, []
Peek (or find) job by id from beanstalkd.
53 54 55 56 57 58 |
# File 'lib/beaneater/job/collection.rb', line 53 def find(id) res = transmit("peek #{id}") Job.new(client, res) rescue Beaneater::NotFoundError nil end |
#process!(options = {}) ⇒ Object
Watch, reserve, process and delete or bury or release jobs.
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/beaneater/job/collection.rb', line 106 def process!(={}) release_delay = .delete(:release_delay) || RELEASE_DELAY reserve_timeout = .delete(:reserve_timeout) || RESERVE_TIMEOUT client.tubes.watch!(*processors.keys) while !stop? do begin @current_job = client.tubes.reserve(reserve_timeout) processor = processors[@current_job.tube] begin processor[:block].call(@current_job) @current_job.delete rescue *processor[:retry_on] if @current_job.stats.releases < processor[:max_retries] @current_job.release(:delay => release_delay) end end rescue AbortProcessingError break rescue Beaneater::JobNotReserved, Beaneater::NotFoundError, Beaneater::TimedOutError retry rescue StandardError # handles unspecified errors @current_job.bury if @current_job ensure # bury if still reserved @current_job.bury if @current_job && @current_job.exists? && @current_job.reserved? @current_job = nil end end end |
#register(tube_name, options = {}, &block) ⇒ Object
Register a processor to handle beanstalkd job on particular tube.
80 81 82 83 84 85 |
# File 'lib/beaneater/job/collection.rb', line 80 def register(tube_name, ={}, &block) @processors ||= {} max_retries = [:max_retries] || MAX_RETRIES retry_on = Array([:retry_on]) @processors[tube_name.to_s] = { :block => block, :retry_on => retry_on, :max_retries => max_retries } end |
#stop! ⇒ Object
Sets flag to indicate that process loop should stop after current job
88 89 90 |
# File 'lib/beaneater/job/collection.rb', line 88 def stop! @stop = true end |
#stop? ⇒ Boolean
Returns whether the process loop should stop
95 96 97 |
# File 'lib/beaneater/job/collection.rb', line 95 def stop? !!@stop end |
#transmit(command, options = {}) ⇒ Object
Delegates transmit to the connection object.
39 40 41 |
# File 'lib/beaneater/job/collection.rb', line 39 def transmit(command, ={}) client.connection.transmit(command, **) end |