Module: Peon
Defined Under Namespace
Classes: Handler
Constant Summary collapse
- @@already_running =
false
Instance Method Summary collapse
- #check_all ⇒ Object
- #enqueue(jobs, data = {}) ⇒ Object
- #error(&blk) ⇒ Object
- #job(queue, options = {}, &blk) ⇒ Object
- #log(msg) ⇒ Object
- #logger(&blk) ⇒ Object
- #process_job(job, stats) ⇒ Object
- #reserve_job ⇒ Object
- #run ⇒ Object
Instance Method Details
#check_all ⇒ Object
56 57 58 59 60 |
# File 'lib/peon.rb', line 56 def check_all @@handlers.each do |queue, handler| handler.check end end |
#enqueue(jobs, data = {}) ⇒ Object
13 14 15 16 17 18 19 20 21 22 23 24 |
# File 'lib/peon.rb', line 13 def enqueue(jobs, data = {}) if jobs.respond_to? :shift queue = jobs.shift data["next_job"] = jobs unless jobs.empty? else queue = jobs end log "send: #{queue}:#{data.to_json}" beanstalk.use(queue) beanstalk.put(data.to_json) end |
#error(&blk) ⇒ Object
31 32 33 |
# File 'lib/peon.rb', line 31 def error(&blk) @@error_handler = blk end |
#job(queue, options = {}, &blk) ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/peon.rb', line 39 def job(queue, = {}, &blk) handler = Peon::Handler.new(queue) handler.when = [:when] if [:when] handler.unsub = lambda do log "unsubscribing to #{queue}" jack.ignore(queue) end handler.sub = lambda do log "subscribing to #{queue}" jack.watch(queue) end handler.task = blk @@handlers ||= {} at_exit { Peon.run unless @@already_running == true } if @@handlers.size == 0 @@handlers[queue] = handler end |
#log(msg) ⇒ Object
26 27 28 29 |
# File 'lib/peon.rb', line 26 def log(msg) @@logger ||= proc { |m| puts "#{Time.now} :peon: #{m}" } @@logger.call(msg) end |
#logger(&blk) ⇒ Object
35 36 37 |
# File 'lib/peon.rb', line 35 def logger(&blk) @@logger = blk end |
#process_job(job, stats) ⇒ Object
71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/peon.rb', line 71 def process_job(job, stats) queue = stats['tube'] args = JSON.load job.body log "recv: #{queue}:#{job.body}" handler = @@handlers[queue] response = handler.task.call(args) jack.delete(job) check_all next_job(args, response) reserve_job end |
#reserve_job ⇒ Object
62 63 64 65 66 67 68 69 |
# File 'lib/peon.rb', line 62 def reserve_job r = jack.reserve r.callback do |job| job.stats.callback do |stats| process_job(job, stats) end end end |
#run ⇒ Object
83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/peon.rb', line 83 def run log "Starting up" @@already_running = true Signal.trap('INT') { EM.stop } Signal.trap('TERM') { EM.stop } EM.run do check_all reserve_job end end |