Class: BBQueue::Consumer
- Inherits:
-
Object
- Object
- BBQueue::Consumer
- Defined in:
- lib/bbqueue/consumer.rb
Instance Attribute Summary collapse
-
#logger ⇒ Object
Returns the value of attribute logger.
Instance Method Summary collapse
- #after_fork ⇒ Object
- #before_fork ⇒ Object
- #fork? ⇒ Boolean
- #fork_and_wait ⇒ Object
-
#initialize(queue_names, options = {}) ⇒ Consumer
constructor
A new instance of Consumer.
- #work(job, queue_name) ⇒ Object
Constructor Details
#initialize(queue_names, options = {}) ⇒ Consumer
Returns a new instance of Consumer.
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/bbqueue/consumer.rb', line 45 def initialize(queue_names, = {}) self.logger = logger = [:logger] || BBQueue::NullLogger.new consumer = self Stalking::Consumer.new .merge(:logger => BBQueue::FatalLogger.new(logger)) do Array(queue_names).each do |queue_name| job queue_name do |args| consumer.work BBQueue::Serializer.load(args["object"]), queue_name end end before do |queue_name, args| logger.info "Job #{BBQueue::Serializer.load(args["object"]).inspect} on #{queue_name.inspect} started" end after do |queue_name, args| logger.info "Job #{BBQueue::Serializer.load(args["object"]).inspect} on #{queue_name.inspect} finished" end error do |e, queue_name, args| logger.error "Job #{BBQueue::Serializer.load(args["object"]).inspect} on #{queue_name.inspect} failed" logger.error e end end end |
Instance Attribute Details
#logger ⇒ Object
Returns the value of attribute logger.
4 5 6 |
# File 'lib/bbqueue/consumer.rb', line 4 def logger @logger end |
Instance Method Details
#after_fork ⇒ Object
10 11 12 |
# File 'lib/bbqueue/consumer.rb', line 10 def after_fork # Nothing end |
#before_fork ⇒ Object
6 7 8 |
# File 'lib/bbqueue/consumer.rb', line 6 def before_fork # Nothing end |
#fork? ⇒ Boolean
14 15 16 |
# File 'lib/bbqueue/consumer.rb', line 14 def fork? false end |
#fork_and_wait ⇒ Object
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/bbqueue/consumer.rb', line 18 def fork_and_wait if fork? before_fork Process.fork do after_fork yield end Process.wait else yield end end |
#work(job, queue_name) ⇒ Object
34 35 36 37 38 39 40 41 42 43 |
# File 'lib/bbqueue/consumer.rb', line 34 def work(job, queue_name) fork_and_wait do begin job.work rescue Timeout::Error, StandardError => e logger.error "Job #{job.inspect} on #{queue_name.inspect} failed" logger.error e end end end |