Module: Creeper
- Defined in:
- lib/creeper/worker.rb,
lib/creeper.rb,
lib/creeper/creep.rb,
lib/creeper/version.rb,
lib/creeper/launcher.rb,
lib/creeper/err_logger.rb,
lib/creeper/out_logger.rb
Overview
Defined Under Namespace
Modules: Creep, ErrLogger, Launcher, OutLogger
Classes: BadURL, Worker
Constant Summary
collapse
- HANDLERS =
{
named: {},
before_each: [],
before_named: {},
after_each: [],
after_named: {},
error_each: [],
error_named: {},
finalizers: []
}
- WORKERS =
{}
- VERSION =
"1.0.9"
Class Attribute Summary collapse
Class Method Summary
collapse
-
.after(name = nil, &block) ⇒ Object
-
.after_handlers_for(name) ⇒ Object
-
.all_jobs ⇒ Object
-
.beanstalk ⇒ Object
-
.beanstalk_addresses ⇒ Object
-
.beanstalk_url ⇒ Object
-
.beanstalk_url=(beanstalk_url) ⇒ Object
-
.before(name = nil, &block) ⇒ Object
-
.before_handlers_for(name) ⇒ Object
-
.connect(addresses = nil) ⇒ Object
-
.disconnect ⇒ Object
-
.drop(name) ⇒ Object
-
.enqueue(job, data = {}, options = {}) ⇒ Object
-
.enqueue!(job, data = {}, options = {}) ⇒ Object
-
.err_logger ⇒ Object
-
.err_logger=(err_logger) ⇒ Object
-
.error(name = nil, &block) ⇒ Object
-
.error_handlers_for(name) ⇒ Object
-
.error_work(worker, data, name, job) ⇒ Object
-
.finalizer(&block) ⇒ Object
-
.finalizers ⇒ Object
-
.handler_for(name) ⇒ Object
-
.job(name, &block) ⇒ Object
-
.out_logger ⇒ Object
-
.out_logger=(out_logger) ⇒ Object
-
.register_worker(worker) ⇒ Object
-
.shutdown=(shutdown) ⇒ Object
-
.shutdown? ⇒ Boolean
-
.shutdown_workers ⇒ Object
-
.start_work(worker, data, name, job) ⇒ Object
-
.stop_work(worker, data, name, job) ⇒ Object
-
.unregister_worker(worker, reason = nil) ⇒ Object
-
.work(jobs = nil, size = nil) ⇒ Object
-
.worker_pool ⇒ Object
-
.worker_pool=(worker_pool) ⇒ Object
Class Attribute Details
.lock ⇒ Object
43
44
45
|
# File 'lib/creeper.rb', line 43
def lock
@lock
end
|
.patience_hard ⇒ Object
Returns the value of attribute patience_hard.
44
45
46
|
# File 'lib/creeper.rb', line 44
def patience_hard
@patience_hard
end
|
.patience_soft ⇒ Object
Returns the value of attribute patience_soft.
44
45
46
|
# File 'lib/creeper.rb', line 44
def patience_soft
@patience_soft
end
|
.pool_size ⇒ Object
Returns the value of attribute pool_size.
44
45
46
|
# File 'lib/creeper.rb', line 44
def pool_size
@pool_size
end
|
.reserve_timeout ⇒ Object
Returns the value of attribute reserve_timeout.
44
45
46
|
# File 'lib/creeper.rb', line 44
def reserve_timeout
@reserve_timeout
end
|
.retry_count ⇒ Object
Returns the value of attribute retry_count.
44
45
46
|
# File 'lib/creeper.rb', line 44
def retry_count
@retry_count
end
|
Class Method Details
.after(name = nil, &block) ⇒ Object
196
197
198
199
200
201
202
203
204
205
206
|
# File 'lib/creeper.rb', line 196
def after(name = nil, &block)
if name and name != :each
lock.synchronize do
HANDLERS[:after_named][name] << block
end
else
lock.synchronize do
HANDLERS[:after_each] << block
end
end
end
|
.after_handlers_for(name) ⇒ Object
208
209
210
211
212
|
# File 'lib/creeper.rb', line 208
def after_handlers_for(name)
lock.synchronize do
HANDLERS[:after_each] + HANDLERS[:after_named][name]
end
end
|
.all_jobs ⇒ Object
146
147
148
149
150
|
# File 'lib/creeper.rb', line 146
def all_jobs
lock.synchronize do
HANDLERS[:named].keys
end
end
|
.beanstalk ⇒ Object
110
111
112
|
# File 'lib/creeper.rb', line 110
def beanstalk
Thread.current[:beanstalk_pool_connection] ||= connect
end
|
.beanstalk_addresses ⇒ Object
114
115
116
117
118
119
|
# File 'lib/creeper.rb', line 114
def beanstalk_addresses
uris = beanstalk_url.split(/[\s,]+/)
uris.map do |uri|
beanstalk_host_and_port(uri)
end
end
|
.beanstalk_url ⇒ Object
46
47
48
49
50
|
# File 'lib/creeper.rb', line 46
def beanstalk_url
lock.synchronize do
@beanstalk_url ||= ENV['BEANSTALK_URL'] || 'beanstalk://127.0.0.1/'
end
end
|
.beanstalk_url=(beanstalk_url) ⇒ Object
52
53
54
55
56
|
# File 'lib/creeper.rb', line 52
def beanstalk_url=(beanstalk_url)
lock.synchronize do
@beanstalk_url = beanstalk_url
end
end
|
.before(name = nil, &block) ⇒ Object
178
179
180
181
182
183
184
185
186
187
188
|
# File 'lib/creeper.rb', line 178
def before(name = nil, &block)
if name and name != :each
lock.synchronize do
HANDLERS[:before_named][name] << block
end
else
lock.synchronize do
HANDLERS[:before_each] << block
end
end
end
|
.before_handlers_for(name) ⇒ Object
190
191
192
193
194
|
# File 'lib/creeper.rb', line 190
def before_handlers_for(name)
lock.synchronize do
HANDLERS[:before_each] + HANDLERS[:before_named][name]
end
end
|
.connect(addresses = nil) ⇒ Object
121
122
123
|
# File 'lib/creeper.rb', line 121
def connect(addresses = nil)
Beanstalk::Pool.new(addresses || beanstalk_addresses)
end
|
.disconnect ⇒ Object
125
126
127
128
|
# File 'lib/creeper.rb', line 125
def disconnect
Thread.current[:beanstalk_pool_connection].close rescue nil
Thread.current[:beanstalk_pool_connection] = nil
end
|
.drop(name) ⇒ Object
162
163
164
165
166
167
168
169
170
|
# File 'lib/creeper.rb', line 162
def drop(name)
lock.synchronize do
HANDLERS[:named].delete(name)
HANDLERS[:before_named].delete(name)
HANDLERS[:after_named].delete(name)
HANDLERS[:error_named].delete(name)
true
end
end
|
.enqueue(job, data = {}, options = {}) ⇒ Object
248
249
250
251
252
253
254
|
# File 'lib/creeper.rb', line 248
def enqueue(job, data = {}, options = {})
OutLogger.debug "[#{Thread.current[:actor] ? Thread.current[:actor].subject.number : nil}] Enqueueing #{job.inspect}, #{data.inspect}" if $DEBUG
enqueue!(job, data, options)
rescue Beanstalk::NotConnected => e
disconnected(self, :enqueue, job, data, options)
end
|
.enqueue!(job, data = {}, options = {}) ⇒ Object
256
257
258
259
260
261
262
263
|
# File 'lib/creeper.rb', line 256
def enqueue!(job, data = {}, options = {})
priority = options[:priority] || options[:pri] || 65536
delay = [ 0, options[:delay].to_i ].max
time_to_run = options[:time_to_run] || options[:ttr] || 120
beanstalk.use job
beanstalk.put JSON.dump([ job, data ]), priority, delay, time_to_run
end
|
.err_logger ⇒ Object
58
59
60
61
62
|
# File 'lib/creeper.rb', line 58
def err_logger
lock.synchronize do
@err_logger ||= ::Logger.new($stderr)
end
end
|
.err_logger=(err_logger) ⇒ Object
64
65
66
67
68
|
# File 'lib/creeper.rb', line 64
def err_logger=(err_logger)
lock.synchronize do
@err_logger = err_logger
end
end
|
.error(name = nil, &block) ⇒ Object
214
215
216
217
218
219
220
221
222
223
224
|
# File 'lib/creeper.rb', line 214
def error(name = nil, &block)
if name and name != :each
lock.synchronize do
HANDLERS[:error_named][name] << block
end
else
lock.synchronize do
HANDLERS[:error_each] << block
end
end
end
|
.error_handlers_for(name) ⇒ Object
226
227
228
229
230
|
# File 'lib/creeper.rb', line 226
def error_handlers_for(name)
lock.synchronize do
HANDLERS[:error_each] + HANDLERS[:error_named][name]
end
end
|
.error_work(worker, data, name, job) ⇒ Object
269
270
271
272
273
274
|
# File 'lib/creeper.rb', line 269
def error_work(worker, data, name, job)
(worker.stopped_at = Time.now).tap do |stopped_at|
error_message = "#{worker.prefix} Error after #{worker.time_in_milliseconds}ms #{worker.dump(job, name, data)}"
OutLogger.error error_message
end
end
|
.finalizer(&block) ⇒ Object
232
233
234
235
236
|
# File 'lib/creeper.rb', line 232
def finalizer(&block)
lock.synchronize do
HANDLERS[:finalizers] << block
end
end
|
.finalizers ⇒ Object
238
239
240
241
242
|
# File 'lib/creeper.rb', line 238
def finalizers
lock.synchronize do
HANDLERS[:finalizers]
end
end
|
.handler_for(name) ⇒ Object
172
173
174
175
176
|
# File 'lib/creeper.rb', line 172
def handler_for(name)
lock.synchronize do
HANDLERS[:named][name]
end
end
|
.job(name, &block) ⇒ Object
152
153
154
155
156
157
158
159
160
|
# File 'lib/creeper.rb', line 152
def job(name, &block)
lock.synchronize do
HANDLERS[:named][name] = block
HANDLERS[:before_named][name] ||= []
HANDLERS[:after_named][name] ||= []
HANDLERS[:error_named][name] ||= []
HANDLERS[:named][name]
end
end
|
.out_logger ⇒ Object
70
71
72
73
74
|
# File 'lib/creeper.rb', line 70
def out_logger
lock.synchronize do
@out_logger ||= ::Logger.new($stdout)
end
end
|
.out_logger=(out_logger) ⇒ Object
76
77
78
79
80
|
# File 'lib/creeper.rb', line 76
def out_logger=(out_logger)
lock.synchronize do
@out_logger = out_logger
end
end
|
.register_worker(worker) ⇒ Object
276
277
278
279
280
281
282
283
|
# File 'lib/creeper.rb', line 276
def register_worker(worker)
lock.synchronize do
number = ((0..(WORKERS.keys.max || 0)+1).to_a - WORKERS.keys).first
WORKERS[number] = worker.tap do
worker.number = number
end
end
end
|
.shutdown=(shutdown) ⇒ Object
100
101
102
103
104
|
# File 'lib/creeper.rb', line 100
def shutdown=(shutdown)
lock.synchronize do
@shutdown = shutdown
end
end
|
.shutdown? ⇒ Boolean
94
95
96
97
98
|
# File 'lib/creeper.rb', line 94
def shutdown?
lock.synchronize do
!!@shutdown
end
end
|
.shutdown_workers ⇒ Object
285
286
287
288
289
290
291
292
293
294
295
|
# File 'lib/creeper.rb', line 285
def shutdown_workers
begin
soft_shutdown_workers(Creeper.patience_soft)
rescue Timeout::Error
begin
hard_shutdown_workers(Creeper.patience_hard)
rescue Timeout::Error
kill_shutdown_workers
end
end
end
|
.start_work(worker, data, name, job) ⇒ Object
297
298
299
300
301
|
# File 'lib/creeper.rb', line 297
def start_work(worker, data, name, job)
(worker.started_at = Time.now).tap do |started_at|
OutLogger.info "#{worker.prefix} Working #{worker.dump(job, name, data)}"
end
end
|
.stop_work(worker, data, name, job) ⇒ Object
303
304
305
306
307
|
# File 'lib/creeper.rb', line 303
def stop_work(worker, data, name, job)
(worker.stopped_at = Time.now).tap do |stopped_at|
OutLogger.info "#{worker.prefix} Finished in #{worker.time_in_milliseconds}ms #{worker.dump(job, name, data)}"
end
end
|
.unregister_worker(worker, reason = nil) ⇒ Object
309
310
311
312
313
314
315
|
# File 'lib/creeper.rb', line 309
def unregister_worker(worker, reason = nil)
reason ||= 'Stopping'
OutLogger.info "#{worker.prefix} #{reason}"
lock.synchronize do
WORKERS.delete(worker.number)
end
end
|
.work(jobs = nil, size = nil) ⇒ Object
.worker_pool ⇒ Object
82
83
84
85
86
|
# File 'lib/creeper.rb', line 82
def worker_pool
lock.synchronize do
@worker_pool
end
end
|
.worker_pool=(worker_pool) ⇒ Object
88
89
90
91
92
|
# File 'lib/creeper.rb', line 88
def worker_pool=(worker_pool)
lock.synchronize do
@worker_pool = worker_pool
end
end
|