Class: PikaQue::Subscriber
- Inherits:
-
Object
- Object
- PikaQue::Subscriber
- Defined in:
- lib/pika_que/subscriber.rb
Instance Attribute Summary collapse
-
#broker ⇒ Object
Returns the value of attribute broker.
-
#handler ⇒ Object
Returns the value of attribute handler.
-
#pool ⇒ Object
Returns the value of attribute pool.
-
#queue ⇒ Object
Returns the value of attribute queue.
Instance Method Summary collapse
- #handle_message(worker, delivery_info, metadata, msg) ⇒ Object
-
#initialize(opts = {}) ⇒ Subscriber
constructor
A new instance of Subscriber.
- #setup_handler(handler_class, handler_opts) ⇒ Object
- #setup_queue(queue_name, queue_opts) ⇒ Object
- #subscribe(worker) ⇒ Object
- #teardown ⇒ Object
- #unsubscribe ⇒ Object
Methods included from Metrics
init_metrics, metrics, #metrics
Methods included from Reporters
Methods included from Logging
init_logger, logger, #logger, logger=
Constructor Details
#initialize(opts = {}) ⇒ Subscriber
Returns a new instance of Subscriber.
13 14 15 16 17 18 |
# File 'lib/pika_que/subscriber.rb', line 13 def initialize(opts = {}) @opts = PikaQue.config.merge(opts) @codec = PikaQue::Util.constantize(@opts[:codec]) @broker = @opts[:broker] || PikaQue::Broker.new(nil, @opts).tap{ |b| b.start } @pool = @opts[:worker_pool] || Concurrent::FixedThreadPool.new(@opts[:concurrency] || 1) end |
Instance Attribute Details
#broker ⇒ Object
Returns the value of attribute broker.
11 12 13 |
# File 'lib/pika_que/subscriber.rb', line 11 def broker @broker end |
#handler ⇒ Object
Returns the value of attribute handler.
11 12 13 |
# File 'lib/pika_que/subscriber.rb', line 11 def handler @handler end |
#pool ⇒ Object
Returns the value of attribute pool.
11 12 13 |
# File 'lib/pika_que/subscriber.rb', line 11 def pool @pool end |
#queue ⇒ Object
Returns the value of attribute queue.
11 12 13 |
# File 'lib/pika_que/subscriber.rb', line 11 def queue @queue end |
Instance Method Details
#handle_message(worker, delivery_info, metadata, msg) ⇒ Object
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/pika_que/subscriber.rb', line 40 def (worker, delivery_info, , msg) res = nil error = nil begin decoded_msg = @codec.decode(msg) metrics.measure("work.#{worker.class.name}.time") do PikaQue.middleware.invoke(worker, delivery_info, , decoded_msg) do res = worker.work(delivery_info, , decoded_msg) end end logger.debug "done processing #{res} <#{msg}>" rescue => worker_err res = :error error = worker_err notify_reporters(worker_err, worker.class, msg) end if @opts[:ack] begin handler.handle(res, broker.channel, delivery_info, , msg, error) metrics.increment("work.#{worker.class.name}.handled.#{res}") rescue => handler_err notify_reporters(handler_err, handler.class, msg) metrics.increment("work.#{worker.class.name}.handler.error") end else metrics.increment("work.#{worker.class.name}.handled.noop") end metrics.increment("work.#{worker.class.name}.processed") end |
#setup_handler(handler_class, handler_opts) ⇒ Object
24 25 26 27 28 29 |
# File 'lib/pika_que/subscriber.rb', line 24 def setup_handler(handler_class, handler_opts) @handler = broker.handler(handler_class, @opts[:handler_options].merge(handler_opts || {})) # TODO use routing keys? logger.info "binding queue #{@queue.name} to handler #{@handler.class}" @handler.bind_queue(@queue, @queue.name) end |
#setup_queue(queue_name, queue_opts) ⇒ Object
20 21 22 |
# File 'lib/pika_que/subscriber.rb', line 20 def setup_queue(queue_name, queue_opts) @queue = broker.queue(queue_name, @opts[:queue_options].merge(queue_opts)) end |
#subscribe(worker) ⇒ Object
31 32 33 34 35 36 37 38 |
# File 'lib/pika_que/subscriber.rb', line 31 def subscribe(worker) @consumer = queue.subscribe(:block => false, :manual_ack => @opts[:ack], :arguments => worker.consumer_arguments) do | delivery_info, , msg | # TODO make idletime configurable on thread pool? default is 60. pool.post do (worker, delivery_info, , msg) end end end |
#teardown ⇒ Object
76 77 78 79 80 81 82 83 |
# File 'lib/pika_que/subscriber.rb', line 76 def teardown unless @opts[:worker_pool] @pool.shutdown @pool.wait_for_termination 12 end broker.cleanup broker.stop end |
#unsubscribe ⇒ Object
71 72 73 74 |
# File 'lib/pika_que/subscriber.rb', line 71 def unsubscribe @consumer.cancel if @consumer @consumer = nil end |