Module: Peon

Extended by:
Peon
Included in:
Peon
Defined in:
lib/peon.rb,
lib/peon/handler.rb

Defined Under Namespace

Classes: Handler

Constant Summary collapse

@@already_running =
false

Instance Method Summary collapse

Instance Method Details

#check_allObject



56
57
58
59
60
# File 'lib/peon.rb', line 56

def check_all
	@@handlers.each do |queue, handler|
		handler.check
	end
end

#enqueue(jobs, data = {}) ⇒ Object



13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/peon.rb', line 13

def enqueue(jobs, data = {})
	if jobs.respond_to? :shift
		queue = jobs.shift
		data["next_job"] = jobs unless jobs.empty?
	else
		queue = jobs
	end
	
	log "send: #{queue}:#{data.to_json}"
	beanstalk.use(queue)
	beanstalk.put(data.to_json)
end

#error(&blk) ⇒ Object



31
32
33
# File 'lib/peon.rb', line 31

def error(&blk)
	@@error_handler = blk
end

#job(queue, options = {}, &blk) ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/peon.rb', line 39

def job(queue, options = {}, &blk)
	handler = Peon::Handler.new(queue)
	handler.when = options[:when] if options[:when]
	handler.unsub = lambda do
		log "unsubscribing to #{queue}"
		jack.ignore(queue)
	end
	handler.sub = lambda do
		log "subscribing to #{queue}"
		jack.watch(queue)
	end
	handler.task = blk
	@@handlers ||= {}
	at_exit { Peon.run unless @@already_running == true } if @@handlers.size == 0
	@@handlers[queue] = handler
end

#log(msg) ⇒ Object



26
27
28
29
# File 'lib/peon.rb', line 26

def log(msg)
	@@logger ||= proc { |m| puts "#{Time.now} :peon: #{m}" }
	@@logger.call(msg)
end

#logger(&blk) ⇒ Object



35
36
37
# File 'lib/peon.rb', line 35

def logger(&blk)
	@@logger = blk
end

#process_job(job, stats) ⇒ Object



71
72
73
74
75
76
77
78
79
80
81
# File 'lib/peon.rb', line 71

def process_job(job, stats)
	queue = stats['tube']
	args = JSON.load job.body
	log "recv: #{queue}:#{job.body}"
	handler = @@handlers[queue]
	response = handler.task.call(args)
	jack.delete(job)
	check_all
	next_job(args, response)
	reserve_job
end

#reserve_jobObject



62
63
64
65
66
67
68
69
# File 'lib/peon.rb', line 62

def reserve_job
	r = jack.reserve
	r.callback do |job|
		job.stats.callback do |stats|
			process_job(job, stats)
		end
	end
end

#runObject



83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/peon.rb', line 83

def run
	log "Starting up"
	@@already_running = true
	
	Signal.trap('INT') { EM.stop }
	Signal.trap('TERM') { EM.stop }
	
	EM.run do
		check_all
		reserve_job
	end
end