Class: EM::Beanstalk
- Inherits:
-
Object
show all
- Defined in:
- lib/em-beanstalk.rb,
lib/em-beanstalk/job.rb,
lib/em-beanstalk/shell.rb,
lib/em-beanstalk/connection.rb
Defined Under Namespace
Modules: VERSION
Classes: Body, Connection, Job, Shell
Constant Summary
collapse
- Disconnected =
Class.new(RuntimeError)
- InvalidCommand =
Class.new(RuntimeError)
Instance Attribute Summary collapse
Instance Method Summary
collapse
-
#add_deferrable(&block) ⇒ Object
-
#close ⇒ Object
-
#connected ⇒ Object
-
#delete(val, &block) ⇒ Object
-
#disconnected ⇒ Object
-
#drain!(&block) ⇒ Object
-
#each_job(timeout = nil, &block) ⇒ Object
-
#extract_body(bytes, data) ⇒ Object
-
#ignore(tube, &block) ⇒ Object
-
#initialize(opts = nil) ⇒ Beanstalk
constructor
A new instance of Beanstalk.
-
#job_id(val) ⇒ Object
-
#list(type = nil, &block) ⇒ Object
-
#on_error(&block) ⇒ Object
-
#peek(id, &block) ⇒ Object
-
#put(msg, opts = nil, &block) ⇒ Object
-
#received(data) ⇒ Object
-
#reconnect ⇒ Object
-
#release(val, opts = nil, &block) ⇒ Object
-
#reserve(timeout = nil, &block) ⇒ Object
-
#stats(type = nil, val = nil, &block) ⇒ Object
-
#use(tube, &block) ⇒ Object
-
#watch(tube, &block) ⇒ Object
Constructor Details
#initialize(opts = nil) ⇒ Beanstalk
Returns a new instance of Beanstalk.
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
# File 'lib/em-beanstalk.rb', line 25
def initialize(opts = nil)
@host = opts && opts[:host] || 'localhost'
@port = opts && opts[:port] || 11300
@tube = opts && opts[:tube] || 'default'
@retry_count = opts && opts[:retry_count] || 5
@default_priority = opts && opts[:default_priority] || 65536
@default_delay = opts && opts[:default_delay] || 0
@default_ttr = opts && opts[:default_ttr] || 300
@default_timeout = opts && opts[:timeout] || 5
@default_error_callback = opts && opts[:default_error_callback] || Proc.new{ |error| puts "ERROR: #{error.inspect}" }
@raise_on_disconnect = opts && opts.key?(:raise_on_disconnect) ? opts[:raise_on_disconnect] : true
@watched_tubes = []
@data = ""
@retries = 0
@in_reserve = false
@deferrables = []
@conn = EM::connect(host, port, EM::Beanstalk::Connection) do |conn|
conn.client = self
conn.comm_inactivity_timeout = 0
conn.pending_connect_timeout = @default_timeout
end
if @tube
use(@tube)
watch(@tube)
end
end
|
Instance Attribute Details
#default_delay ⇒ Object
Returns the value of attribute default_delay.
23
24
25
|
# File 'lib/em-beanstalk.rb', line 23
def default_delay
@default_delay
end
|
#default_error_callback ⇒ Object
Returns the value of attribute default_error_callback.
23
24
25
|
# File 'lib/em-beanstalk.rb', line 23
def default_error_callback
@default_error_callback
end
|
#default_priority ⇒ Object
Returns the value of attribute default_priority.
23
24
25
|
# File 'lib/em-beanstalk.rb', line 23
def default_priority
@default_priority
end
|
#default_ttr ⇒ Object
Returns the value of attribute default_ttr.
23
24
25
|
# File 'lib/em-beanstalk.rb', line 23
def default_ttr
@default_ttr
end
|
#host ⇒ Object
Returns the value of attribute host.
22
23
24
|
# File 'lib/em-beanstalk.rb', line 22
def host
@host
end
|
#port ⇒ Object
Returns the value of attribute port.
22
23
24
|
# File 'lib/em-beanstalk.rb', line 22
def port
@port
end
|
Instance Method Details
#add_deferrable(&block) ⇒ Object
206
207
208
209
210
|
# File 'lib/em-beanstalk.rb', line 206
def add_deferrable(&block)
df = Defer.new(default_error_callback, &block)
@deferrables.push(df)
df
end
|
#close ⇒ Object
56
57
58
59
|
# File 'lib/em-beanstalk.rb', line 56
def close
@disconnect_manually = true
@conn.close_connection
end
|
#connected ⇒ Object
188
189
190
|
# File 'lib/em-beanstalk.rb', line 188
def connected
@retries = 0
end
|
#delete(val, &block) ⇒ Object
141
142
143
144
145
|
# File 'lib/em-beanstalk.rb', line 141
def delete(val, &block)
return unless val
@conn.send(:delete, job_id(val))
add_deferrable(&block)
end
|
#disconnected ⇒ Object
192
193
194
195
196
197
198
199
|
# File 'lib/em-beanstalk.rb', line 192
def disconnected
@deferrables.each {|d| d.fail(:disconnected) }
unless @disconnect_manually
raise EM::Beanstalk::Disconnected if @retries >= @retry_count && @raise_on_disconnect
@retries += 1
EM.add_timer(1) { reconnect }
end
end
|
#drain!(&block) ⇒ Object
61
62
63
64
65
66
67
|
# File 'lib/em-beanstalk.rb', line 61
def drain!(&block)
stats do |stats|
stats['current-jobs-ready'].zero? ?
EM.next_tick(&block) :
reserve{|job| job.delete{ drain!(&block) }}
end
end
|
#each_job(timeout = nil, &block) ⇒ Object
100
101
102
103
104
105
106
107
108
109
110
|
# File 'lib/em-beanstalk.rb', line 100
def each_job(timeout = nil, &block)
work = Proc.new do
r = reserve(timeout)
r.callback do |job|
block.call(job)
EM.next_tick { work.call }
end
r
end
work.call
end
|
#extract_body(bytes, data) ⇒ Object
279
280
281
282
283
284
285
286
287
288
289
|
# File 'lib/em-beanstalk.rb', line 279
def extract_body(bytes, data)
rem = data[(data.index(/\r\n/) + 2)..-1]
if rem.length < bytes
nil
else
body = rem[0..(bytes - 1)]
data = rem[(bytes + 2)..-1]
data = "" if data.nil?
Body.new(body, data)
end
end
|
#ignore(tube, &block) ⇒ Object
84
85
86
87
88
89
|
# File 'lib/em-beanstalk.rb', line 84
def ignore(tube, &block)
return if not @watched_tubes.include?(tube)
@watched_tubes.delete(tube)
@conn.send(:ignore, tube)
add_deferrable(&block)
end
|
#job_id(val) ⇒ Object
122
123
124
125
126
127
128
129
|
# File 'lib/em-beanstalk.rb', line 122
def job_id(val)
case val
when Job
val.id
else
val
end
end
|
#list(type = nil, &block) ⇒ Object
131
132
133
134
135
136
137
138
139
|
# File 'lib/em-beanstalk.rb', line 131
def list(type = nil, &block)
case(type)
when :tube, :tubes, nil then @conn.send(:'list-tubes')
when :use, :used then @conn.send(:'list-tube-used')
when :watch, :watched then @conn.send(:'list-tubes-watched')
else raise EM::Beanstalk::InvalidCommand.new
end
add_deferrable(&block)
end
|
#on_error(&block) ⇒ Object
212
213
214
|
# File 'lib/em-beanstalk.rb', line 212
def on_error(&block)
@error_callback = block
end
|
#peek(id, &block) ⇒ Object
147
148
149
150
151
152
153
154
155
|
# File 'lib/em-beanstalk.rb', line 147
def peek(id, &block)
case id
when :ready then @conn.send(:'peek-ready')
when :delayed then @conn.send(:'peek-delayed')
when :buried then @conn.send(:'peek-buried')
else @conn.send(:'peek', id)
end
add_deferrable(&block)
end
|
#put(msg, opts = nil, &block) ⇒ Object
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
|
# File 'lib/em-beanstalk.rb', line 157
def put(msg, opts = nil, &block)
case msg
when Job
priority = opts && opts[:priority] || msg.priority
delay = opts && opts[:delay] || msg.delay
ttr = opts && opts[:ttr] || msg.ttr
body = msg.body
else
priority = opts && opts[:priority] || default_priority
delay = opts && opts[:delay] || default_delay
ttr = opts && opts[:ttr] || default_ttr
body = msg.to_s
end
priority = default_priority if priority < 0
priority = 2 ** 32 if priority > (2 ** 32)
delay = default_delay if delay < 0
ttr = default_ttr if ttr < 0
@conn.send_with_data(:put, body, priority, delay, ttr, body.size)
add_deferrable(&block)
end
|
#received(data) ⇒ Object
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
|
# File 'lib/em-beanstalk.rb', line 216
def received(data)
@data << data
until @data.empty?
idx = @data.index(/(.*?)\r\n/)
break if idx.nil?
first = $1
case (first)
when /^DELETED/
df = @deferrables.shift
df.succeed
when /^INSERTED\s+(\d+)/
df = @deferrables.shift
df.succeed($1.to_i)
when /^RELEASED/
df = @deferrables.shift
df.succeed
when /^BURIED\s+(\d+)/
df = @deferrables.shift
df.fail(:buried, $1.to_i)
when /^USING\s+(.*)/
df = @deferrables.shift
df.succeed($1)
when /^WATCHING\s+(\d+)/
df = @deferrables.shift
df.succeed($1.to_i)
when /^OK\s+(\d+)/
bytes = $1.to_i
if body = extract_body(bytes, @data)
@data = body.data
df = @deferrables.shift
df.succeed(YAML.load(body.body))
next
else
break
end
when /^(RESERVED|FOUND)\s+(\d+)\s+(\d+)/
id = $2.to_i
bytes = $3.to_i
if body = extract_body(bytes, @data)
@data = body.data
df = @deferrables.shift
job = EM::Beanstalk::Job.new(self, id, body.body)
df.succeed(job)
next
else
break
end
when /^(OUT_OF_MEMORY|INTERNAL_ERROR|DRAINING|BAD_FORMAT|UNKNOWN_COMMAND|EXPECTED_CRLF|JOB_TOO_BIG|DEADLINE_SOON|TIMED_OUT|NOT_FOUND)/
puts "... got error, calling df."
df = @deferrables.shift
df.fail($1.downcase.to_sym)
@data = @data[($1.length + 2)..-1]
else
break
end
@data.slice!(0, first.size + 2)
end
end
|
#reconnect ⇒ Object
201
202
203
204
|
# File 'lib/em-beanstalk.rb', line 201
def reconnect
@disconnect_manually = false
@conn.reconnect(host, port)
end
|
#release(val, opts = nil, &block) ⇒ Object
180
181
182
183
184
185
186
|
# File 'lib/em-beanstalk.rb', line 180
def release(val, opts = nil, &block)
return if val.nil?
delay = opts && opts[:delay] || default_delay
priority = opts && opts[:priority] || default_priority
@conn.send(:release, job_id(val), priority.to_i, delay.to_i)
add_deferrable(&block)
end
|
#reserve(timeout = nil, &block) ⇒ Object
91
92
93
94
95
96
97
98
|
# File 'lib/em-beanstalk.rb', line 91
def reserve(timeout = nil, &block)
if timeout
@conn.send(:'reserve-with-timeout', timeout)
else
@conn.send(:reserve)
end
add_deferrable(&block)
end
|
#stats(type = nil, val = nil, &block) ⇒ Object
112
113
114
115
116
117
118
119
120
|
# File 'lib/em-beanstalk.rb', line 112
def stats(type = nil, val = nil, &block)
case(type)
when nil then @conn.send(:stats)
when :tube then @conn.send(:'stats-tube', val)
when :job then @conn.send(:'stats-job', job_id(val))
else raise EM::Beanstalk::InvalidCommand.new
end
add_deferrable(&block)
end
|
#use(tube, &block) ⇒ Object
69
70
71
72
73
74
75
|
# File 'lib/em-beanstalk.rb', line 69
def use(tube, &block)
return if @used_tube == tube
@used_tube = tube
@conn.send(:use, tube)
add_deferrable(&block)
end
|
#watch(tube, &block) ⇒ Object
77
78
79
80
81
82
|
# File 'lib/em-beanstalk.rb', line 77
def watch(tube, &block)
return if @watched_tubes.include?(tube)
@watched_tubes.push(tube)
@conn.send(:watch, tube)
add_deferrable(&block)
end
|