Class: Beaneater::Tube
- Inherits:
-
Object
- Object
- Beaneater::Tube
- Defined in:
- lib/beaneater/tube/record.rb
Overview
Beanstalk tube which contains jobs which can be inserted, reserved, et al.
Instance Attribute Summary collapse
-
#client ⇒ Object
Returns the value of attribute client.
-
#name ⇒ String
Name of the tube.
Instance Method Summary collapse
-
#clear ⇒ Object
Clears all unreserved jobs in all states from the tube.
-
#config ⇒ Beaneater::Configuration
protected
Returns configuration options for beaneater.
-
#initialize(client, name) ⇒ Tube
constructor
Fetches the specified tube.
-
#kick(bounds = 1) ⇒ Hash{String => String, Number}
Kick specified number of jobs from buried to ready state.
-
#pause(delay) ⇒ Array<Hash{String => String, Number}>
Pause the execution of this tube for specified
delay
. -
#peek(state) ⇒ Beaneater::Job
Peek at next job within this tube in given
state
. -
#put(body, options = {}) ⇒ Hash{String => String, Number}
Inserts job with specified body onto tube.
-
#reserve(timeout = nil, &block) {|job| ... } ⇒ Beaneater::Job
Reserves the next job from tube.
-
#safe_use(&block) ⇒ Object
protected
Transmits a beanstalk command that requires this tube to be set as used.
-
#stats ⇒ Beaneater::StatStruct
Returns related stats for this tube.
-
#to_s ⇒ String
(also: #inspect)
String representation of tube.
-
#transmit(command, options = {}) ⇒ Object
Delegates transmit to the connection object.
Constructor Details
#initialize(client, name) ⇒ Tube
Fetches the specified tube.
18 19 20 21 22 |
# File 'lib/beaneater/tube/record.rb', line 18 def initialize(client, name) @client = client @name = name.to_s @mutex = Mutex.new end |
Instance Attribute Details
#client ⇒ Object
Returns the value of attribute client.
9 |
# File 'lib/beaneater/tube/record.rb', line 9 attr_reader :name, :client |
#name ⇒ String
Returns name of the tube.
9 10 11 |
# File 'lib/beaneater/tube/record.rb', line 9 def name @name end |
Instance Method Details
#clear ⇒ Object
Clears all unreserved jobs in all states from the tube
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/beaneater/tube/record.rb', line 132 def clear client.tubes.watch!(self.name) %w(delayed buried ready).each do |state| while job = self.peek(state.to_sym) begin job.delete rescue Beaneater::UnexpectedResponse, Beaneater::NotFoundError # swallow any issues end end end client.tubes.ignore(name) rescue Beaneater::NotIgnoredError # swallow any issues end |
#config ⇒ Beaneater::Configuration (protected)
Returns configuration options for beaneater
180 181 182 |
# File 'lib/beaneater/tube/record.rb', line 180 def config Beaneater.configuration end |
#kick(bounds = 1) ⇒ Hash{String => String, Number}
Kick specified number of jobs from buried to ready state.
99 100 101 |
# File 'lib/beaneater/tube/record.rb', line 99 def kick(bounds=1) safe_use { transmit("kick #{bounds}") } end |
#pause(delay) ⇒ Array<Hash{String => String, Number}>
Pause the execution of this tube for specified delay
.
123 124 125 |
# File 'lib/beaneater/tube/record.rb', line 123 def pause(delay) transmit("pause-tube #{name} #{delay}") end |
#peek(state) ⇒ Beaneater::Job
Peek at next job within this tube in given state
.
66 67 68 69 70 71 72 73 74 |
# File 'lib/beaneater/tube/record.rb', line 66 def peek(state) safe_use do res = transmit("peek-#{state}") Job.new(client, res) end rescue Beaneater::NotFoundError # Return nil if not found nil end |
#put(body, options = {}) ⇒ Hash{String => String, Number}
Inserts job with specified body onto tube.
43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/beaneater/tube/record.rb', line 43 def put(body, ={}) safe_use do serialized_body = config.job_serializer.call(body) = { :pri => config.default_put_pri, :delay => config.default_put_delay, :ttr => config.default_put_ttr }.merge() = "#{[:pri]} #{[:delay]} #{[:ttr]} #{serialized_body.bytesize}" transmit("put #{}\r\n#{serialized_body}") end end |
#reserve(timeout = nil, &block) {|job| ... } ⇒ Beaneater::Job
Reserves the next job from tube.
86 87 88 89 |
# File 'lib/beaneater/tube/record.rb', line 86 def reserve(timeout=nil, &block) client.tubes.watch!(self.name) client.tubes.reserve(timeout, &block) end |
#safe_use(&block) ⇒ Object (protected)
Transmits a beanstalk command that requires this tube to be set as used.
169 170 171 172 173 174 175 |
# File 'lib/beaneater/tube/record.rb', line 169 def safe_use(&block) @mutex.lock client.tubes.use(self.name) yield ensure @mutex.unlock end |
#stats ⇒ Beaneater::StatStruct
Returns related stats for this tube.
110 111 112 113 |
# File 'lib/beaneater/tube/record.rb', line 110 def stats res = transmit("stats-tube #{name}") StatStruct.from_hash(res[:body]) end |
#to_s ⇒ String Also known as: inspect
String representation of tube.
154 155 156 |
# File 'lib/beaneater/tube/record.rb', line 154 def to_s "#<Beaneater::Tube name=#{name.inspect}>" end |
#transmit(command, options = {}) ⇒ Object
Delegates transmit to the connection object.
27 28 29 |
# File 'lib/beaneater/tube/record.rb', line 27 def transmit(command, ={}) client.connection.transmit(command, **) end |