Module: Creeper

Defined in:
lib/creeper/worker.rb,
lib/creeper.rb,
lib/creeper/creep.rb,
lib/creeper/version.rb,
lib/creeper/launcher.rb,
lib/creeper/err_logger.rb,
lib/creeper/out_logger.rb

Overview

require ‘em-jack’

Defined Under Namespace

Modules: Creep, ErrLogger, Launcher, OutLogger Classes: BadURL, Worker

Constant Summary collapse

HANDLERS =
{
  named:        {},
  before_each:  [],
  before_named: {},
  after_each:   [],
  after_named:  {},
  error_each:   [],
  error_named:  {},
  finalizers:   []
}
WORKERS =
{}
VERSION =
"1.0.9"

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.lockObject (readonly)

configuration ##



43
44
45
# File 'lib/creeper.rb', line 43

def lock
  @lock
end

.patience_hardObject

Returns the value of attribute patience_hard.



44
45
46
# File 'lib/creeper.rb', line 44

def patience_hard
  @patience_hard
end

.patience_softObject

Returns the value of attribute patience_soft.



44
45
46
# File 'lib/creeper.rb', line 44

def patience_soft
  @patience_soft
end

.pool_sizeObject

Returns the value of attribute pool_size.



44
45
46
# File 'lib/creeper.rb', line 44

def pool_size
  @pool_size
end

.reserve_timeoutObject

Returns the value of attribute reserve_timeout.



44
45
46
# File 'lib/creeper.rb', line 44

def reserve_timeout
  @reserve_timeout
end

.retry_countObject

Returns the value of attribute retry_count.



44
45
46
# File 'lib/creeper.rb', line 44

def retry_count
  @retry_count
end

Class Method Details

.after(name = nil, &block) ⇒ Object



196
197
198
199
200
201
202
203
204
205
206
# File 'lib/creeper.rb', line 196

def after(name = nil, &block)
  if name and name != :each
    lock.synchronize do
      HANDLERS[:after_named][name] << block
    end
  else
    lock.synchronize do
      HANDLERS[:after_each] << block
    end
  end
end

.after_handlers_for(name) ⇒ Object



208
209
210
211
212
# File 'lib/creeper.rb', line 208

def after_handlers_for(name)
  lock.synchronize do
    HANDLERS[:after_each] + HANDLERS[:after_named][name]
  end
end

.all_jobsObject

handlers ##



146
147
148
149
150
# File 'lib/creeper.rb', line 146

def all_jobs
  lock.synchronize do
    HANDLERS[:named].keys
  end
end

.beanstalkObject

connection ##



110
111
112
# File 'lib/creeper.rb', line 110

def beanstalk
  Thread.current[:beanstalk_pool_connection] ||= connect
end

.beanstalk_addressesObject



114
115
116
117
118
119
# File 'lib/creeper.rb', line 114

def beanstalk_addresses
  uris = beanstalk_url.split(/[\s,]+/)
  uris.map do |uri|
    beanstalk_host_and_port(uri)
  end
end

.beanstalk_urlObject



46
47
48
49
50
# File 'lib/creeper.rb', line 46

def beanstalk_url
  lock.synchronize do
    @beanstalk_url ||= ENV['BEANSTALK_URL'] || 'beanstalk://127.0.0.1/'
  end
end

.beanstalk_url=(beanstalk_url) ⇒ Object



52
53
54
55
56
# File 'lib/creeper.rb', line 52

def beanstalk_url=(beanstalk_url)
  lock.synchronize do
    @beanstalk_url = beanstalk_url
  end
end

.before(name = nil, &block) ⇒ Object



178
179
180
181
182
183
184
185
186
187
188
# File 'lib/creeper.rb', line 178

def before(name = nil, &block)
  if name and name != :each
    lock.synchronize do
      HANDLERS[:before_named][name] << block
    end
  else
    lock.synchronize do
      HANDLERS[:before_each] << block
    end
  end
end

.before_handlers_for(name) ⇒ Object



190
191
192
193
194
# File 'lib/creeper.rb', line 190

def before_handlers_for(name)
  lock.synchronize do
    HANDLERS[:before_each] + HANDLERS[:before_named][name]
  end
end

.connect(addresses = nil) ⇒ Object



121
122
123
# File 'lib/creeper.rb', line 121

def connect(addresses = nil)
  Beanstalk::Pool.new(addresses || beanstalk_addresses)
end

.disconnectObject



125
126
127
128
# File 'lib/creeper.rb', line 125

def disconnect
  Thread.current[:beanstalk_pool_connection].close rescue nil
  Thread.current[:beanstalk_pool_connection] = nil
end

.drop(name) ⇒ Object



162
163
164
165
166
167
168
169
170
# File 'lib/creeper.rb', line 162

def drop(name)
  lock.synchronize do
    HANDLERS[:named].delete(name)
    HANDLERS[:before_named].delete(name)
    HANDLERS[:after_named].delete(name)
    HANDLERS[:error_named].delete(name)
    true
  end
end

.enqueue(job, data = {}, options = {}) ⇒ Object

queue ##



248
249
250
251
252
253
254
# File 'lib/creeper.rb', line 248

def enqueue(job, data = {}, options = {})
  # OutLogger.debug "#{Thread.current[:actor].inspect} Enqueueing #{job.inspect}, #{data.inspect}"#\n#{Celluloid::Actor.all.pretty_inspect}"
  OutLogger.debug "[#{Thread.current[:actor] ? Thread.current[:actor].subject.number : nil}] Enqueueing #{job.inspect}, #{data.inspect}" if $DEBUG
  enqueue!(job, data, options)
rescue Beanstalk::NotConnected => e
  disconnected(self, :enqueue, job, data, options)
end

.enqueue!(job, data = {}, options = {}) ⇒ Object



256
257
258
259
260
261
262
263
# File 'lib/creeper.rb', line 256

def enqueue!(job, data = {}, options = {})
  priority    = options[:priority] || options[:pri] || 65536
  delay       = [ 0, options[:delay].to_i ].max
  time_to_run = options[:time_to_run] || options[:ttr] || 120

  beanstalk.use job
  beanstalk.put JSON.dump([ job, data ]), priority, delay, time_to_run
end

.err_loggerObject



58
59
60
61
62
# File 'lib/creeper.rb', line 58

def err_logger
  lock.synchronize do
    @err_logger ||= ::Logger.new($stderr)
  end
end

.err_logger=(err_logger) ⇒ Object



64
65
66
67
68
# File 'lib/creeper.rb', line 64

def err_logger=(err_logger)
  lock.synchronize do
    @err_logger = err_logger
  end
end

.error(name = nil, &block) ⇒ Object



214
215
216
217
218
219
220
221
222
223
224
# File 'lib/creeper.rb', line 214

def error(name = nil, &block)
  if name and name != :each
    lock.synchronize do
      HANDLERS[:error_named][name] << block
    end
  else
    lock.synchronize do
      HANDLERS[:error_each] << block
    end
  end
end

.error_handlers_for(name) ⇒ Object



226
227
228
229
230
# File 'lib/creeper.rb', line 226

def error_handlers_for(name)
  lock.synchronize do
    HANDLERS[:error_each] + HANDLERS[:error_named][name]
  end
end

.error_work(worker, data, name, job) ⇒ Object

workers ##



269
270
271
272
273
274
# File 'lib/creeper.rb', line 269

def error_work(worker, data, name, job)
  (worker.stopped_at = Time.now).tap do |stopped_at|
    error_message = "#{worker.prefix} Error after #{worker.time_in_milliseconds}ms #{worker.dump(job, name, data)}"
    OutLogger.error error_message
  end
end

.finalizer(&block) ⇒ Object



232
233
234
235
236
# File 'lib/creeper.rb', line 232

def finalizer(&block)
  lock.synchronize do
    HANDLERS[:finalizers] << block
  end
end

.finalizersObject



238
239
240
241
242
# File 'lib/creeper.rb', line 238

def finalizers
  lock.synchronize do
    HANDLERS[:finalizers]
  end
end

.handler_for(name) ⇒ Object



172
173
174
175
176
# File 'lib/creeper.rb', line 172

def handler_for(name)
  lock.synchronize do
    HANDLERS[:named][name]
  end
end

.job(name, &block) ⇒ Object



152
153
154
155
156
157
158
159
160
# File 'lib/creeper.rb', line 152

def job(name, &block)
  lock.synchronize do
    HANDLERS[:named][name] = block
    HANDLERS[:before_named][name] ||= []
    HANDLERS[:after_named][name]  ||= []
    HANDLERS[:error_named][name]  ||= []
    HANDLERS[:named][name]
  end
end

.out_loggerObject



70
71
72
73
74
# File 'lib/creeper.rb', line 70

def out_logger
  lock.synchronize do
    @out_logger ||= ::Logger.new($stdout)
  end
end

.out_logger=(out_logger) ⇒ Object



76
77
78
79
80
# File 'lib/creeper.rb', line 76

def out_logger=(out_logger)
  lock.synchronize do
    @out_logger = out_logger
  end
end

.register_worker(worker) ⇒ Object



276
277
278
279
280
281
282
283
# File 'lib/creeper.rb', line 276

def register_worker(worker)
  lock.synchronize do
    number = ((0..(WORKERS.keys.max || 0)+1).to_a - WORKERS.keys).first
    WORKERS[number] = worker.tap do
      worker.number = number
    end
  end
end

.shutdown=(shutdown) ⇒ Object



100
101
102
103
104
# File 'lib/creeper.rb', line 100

def shutdown=(shutdown)
  lock.synchronize do
    @shutdown = shutdown
  end
end

.shutdown?Boolean

Returns:

  • (Boolean)


94
95
96
97
98
# File 'lib/creeper.rb', line 94

def shutdown?
  lock.synchronize do
    !!@shutdown
  end
end

.shutdown_workersObject



285
286
287
288
289
290
291
292
293
294
295
# File 'lib/creeper.rb', line 285

def shutdown_workers
  begin
    soft_shutdown_workers(Creeper.patience_soft)
  rescue Timeout::Error
    begin
      hard_shutdown_workers(Creeper.patience_hard)
    rescue Timeout::Error
      kill_shutdown_workers
    end
  end
end

.start_work(worker, data, name, job) ⇒ Object



297
298
299
300
301
# File 'lib/creeper.rb', line 297

def start_work(worker, data, name, job)
  (worker.started_at = Time.now).tap do |started_at|
    OutLogger.info "#{worker.prefix} Working #{worker.dump(job, name, data)}"
  end
end

.stop_work(worker, data, name, job) ⇒ Object



303
304
305
306
307
# File 'lib/creeper.rb', line 303

def stop_work(worker, data, name, job)
  (worker.stopped_at = Time.now).tap do |stopped_at|
    OutLogger.info "#{worker.prefix} Finished in #{worker.time_in_milliseconds}ms #{worker.dump(job, name, data)}"
  end
end

.unregister_worker(worker, reason = nil) ⇒ Object



309
310
311
312
313
314
315
# File 'lib/creeper.rb', line 309

def unregister_worker(worker, reason = nil)
  reason ||= 'Stopping'
  OutLogger.info "#{worker.prefix} #{reason}"
  lock.synchronize do
    WORKERS.delete(worker.number)
  end
end

.work(jobs = nil, size = nil) ⇒ Object

daemon ##



134
135
136
137
138
139
140
# File 'lib/creeper.rb', line 134

def work(jobs = nil, size = nil)
  require 'creeper/worker'

  Creeper.pool_size = size || Creeper.pool_size

  Creeper::Worker.work(jobs, Creeper.pool_size)
end

.worker_poolObject



82
83
84
85
86
# File 'lib/creeper.rb', line 82

def worker_pool
  lock.synchronize do
    @worker_pool
  end
end

.worker_pool=(worker_pool) ⇒ Object



88
89
90
91
92
# File 'lib/creeper.rb', line 88

def worker_pool=(worker_pool)
  lock.synchronize do
    @worker_pool = worker_pool
  end
end