Class: Beaneater::Tube
- Inherits:
-
Object
- Object
- Beaneater::Tube
- Defined in:
- lib/beaneater/tube/record.rb
Overview
Beanstalk tube which contains jobs which can be inserted, reserved, et al.
Instance Attribute Summary collapse
-
#client ⇒ Object
Returns the value of attribute client.
-
#name ⇒ String
Name of the tube.
Instance Method Summary collapse
-
#clear ⇒ Object
Clears all unreserved jobs in all states from the tube.
-
#config ⇒ Beaneater::Configuration
protected
Returns configuration options for beaneater.
-
#initialize(client, name) ⇒ Tube
constructor
Fetches the specified tube.
-
#kick(bounds = 1) ⇒ Hash{String => String, Number}
Kick specified number of jobs from buried to ready state.
-
#pause(delay) ⇒ Array<Hash{String => String, Number}>
Pause the execution of this tube for specified
delay. -
#peek(state) ⇒ Beaneater::Job
Peek at next job within this tube in given
state. -
#put(body, options = {}) ⇒ Hash{String => String, Number}
Inserts job with specified body onto tube.
-
#reserve(timeout = nil, &block) {|job| ... } ⇒ Beaneater::Job
Reserves the next job from tube.
-
#safe_use(&block) ⇒ Object
protected
Transmits a beanstalk command that requires this tube to be set as used.
-
#stats ⇒ Beaneater::StatStruct
Returns related stats for this tube.
-
#to_s ⇒ String
(also: #inspect)
String representation of tube.
-
#transmit(command, options = {}) ⇒ Object
Delegates transmit to the connection object.
Constructor Details
#initialize(client, name) ⇒ Tube
Fetches the specified tube.
20 21 22 23 24 |
# File 'lib/beaneater/tube/record.rb', line 20 def initialize(client, name) @client = client @name = name.to_s @mutex = Mutex.new end |
Instance Attribute Details
#client ⇒ Object
Returns the value of attribute client.
11 |
# File 'lib/beaneater/tube/record.rb', line 11 attr_reader :name, :client |
#name ⇒ String
Returns name of the tube.
11 12 13 |
# File 'lib/beaneater/tube/record.rb', line 11 def name @name end |
Instance Method Details
#clear ⇒ Object
Clears all unreserved jobs in all states from the tube
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
# File 'lib/beaneater/tube/record.rb', line 134 def clear client.tubes.watch!(self.name) %w(delayed buried ready).each do |state| while job = self.peek(state.to_sym) begin job.delete rescue Beaneater::UnexpectedResponse, Beaneater::NotFoundError # swallow any issues end end end client.tubes.ignore(name) rescue Beaneater::NotIgnoredError # swallow any issues end |
#config ⇒ Beaneater::Configuration (protected)
Returns configuration options for beaneater
182 183 184 |
# File 'lib/beaneater/tube/record.rb', line 182 def config Beaneater.configuration end |
#kick(bounds = 1) ⇒ Hash{String => String, Number}
Kick specified number of jobs from buried to ready state.
101 102 103 |
# File 'lib/beaneater/tube/record.rb', line 101 def kick(bounds=1) safe_use { transmit("kick #{bounds}") } end |
#pause(delay) ⇒ Array<Hash{String => String, Number}>
Pause the execution of this tube for specified delay.
125 126 127 |
# File 'lib/beaneater/tube/record.rb', line 125 def pause(delay) transmit("pause-tube #{name} #{delay}") end |
#peek(state) ⇒ Beaneater::Job
Peek at next job within this tube in given state.
68 69 70 71 72 73 74 75 76 |
# File 'lib/beaneater/tube/record.rb', line 68 def peek(state) safe_use do res = transmit("peek-#{state}") Job.new(client, res) end rescue Beaneater::NotFoundError # Return nil if not found nil end |
#put(body, options = {}) ⇒ Hash{String => String, Number}
Inserts job with specified body onto tube.
45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/beaneater/tube/record.rb', line 45 def put(body, ={}) safe_use do serialized_body = config.job_serializer.call(body) = { :pri => config.default_put_pri, :delay => config.default_put_delay, :ttr => config.default_put_ttr }.merge() = "#{[:pri]} #{[:delay]} #{[:ttr]} #{serialized_body.bytesize}" transmit("put #{}\r\n#{serialized_body}") end end |
#reserve(timeout = nil, &block) {|job| ... } ⇒ Beaneater::Job
Reserves the next job from tube.
88 89 90 91 |
# File 'lib/beaneater/tube/record.rb', line 88 def reserve(timeout=nil, &block) client.tubes.watch!(self.name) client.tubes.reserve(timeout, &block) end |
#safe_use(&block) ⇒ Object (protected)
Transmits a beanstalk command that requires this tube to be set as used.
171 172 173 174 175 176 177 |
# File 'lib/beaneater/tube/record.rb', line 171 def safe_use(&block) @mutex.lock client.tubes.use(self.name) yield ensure @mutex.unlock end |
#stats ⇒ Beaneater::StatStruct
Returns related stats for this tube.
112 113 114 115 |
# File 'lib/beaneater/tube/record.rb', line 112 def stats res = transmit("stats-tube #{name}") StatStruct.from_hash(res[:body]) end |
#to_s ⇒ String Also known as: inspect
String representation of tube.
156 157 158 |
# File 'lib/beaneater/tube/record.rb', line 156 def to_s "#<Beaneater::Tube name=#{name.inspect}>" end |
#transmit(command, options = {}) ⇒ Object
Delegates transmit to the connection object.
29 30 31 |
# File 'lib/beaneater/tube/record.rb', line 29 def transmit(command, ={}) client.connection.transmit(command, **) end |