Class: PikaQue::Broker
- Inherits:
-
Object
- Object
- PikaQue::Broker
- Defined in:
- lib/pika_que/broker.rb
Instance Method Summary collapse
- #channel ⇒ Object
- #cleanup(force = false) ⇒ Object
- #default_handler ⇒ Object
- #exchange ⇒ Object
- #handler(handler_class, handler_opts = {}) ⇒ Object
- #init_channel ⇒ Object
-
#initialize(processor = nil, opts = {}) ⇒ Broker
constructor
A new instance of Broker.
- #local_connection? ⇒ Boolean
- #queue(queue_name, queue_opts = {}) ⇒ Object
- #start ⇒ Object
- #stop ⇒ Object
Constructor Details
Instance Method Details
#channel ⇒ Object
64 65 66 |
# File 'lib/pika_que/broker.rb', line 64 def channel @channel ||= init_channel end |
#cleanup(force = false) ⇒ Object
74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/pika_que/broker.rb', line 74 def cleanup(force = false) if (@processor && force) || !@processor @channel.close unless @channel.closed? @channel = nil @exchange = nil if @default_handler @default_handler.close @default_handler = nil end @handlers.values.each(&:close) end end |
#default_handler ⇒ Object
56 57 58 |
# File 'lib/pika_que/broker.rb', line 56 def default_handler @default_handler ||= @opts[:handler_class] ? PikaQue::Util.constantize(@opts[:handler_class]).new(@opts[:handler_options].merge({ connection: @connection })) : PikaQue::Handlers::DefaultHandler.new end |
#exchange ⇒ Object
60 61 62 |
# File 'lib/pika_que/broker.rb', line 60 def exchange @exchange ||= channel.exchange(@opts[:exchange], @opts[:exchange_options]) end |
#handler(handler_class, handler_opts = {}) ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/pika_que/broker.rb', line 42 def handler(handler_class, handler_opts = {}) if handler_class h_key = "#{handler_class}-#{handler_opts.hash}" _handler = @handlers[h_key] unless _handler _handler = handler_class.new(handler_opts.merge({ connection: @connection })) @handlers[h_key] = _handler end _handler else default_handler end end |
#init_channel ⇒ Object
68 69 70 71 72 |
# File 'lib/pika_que/broker.rb', line 68 def init_channel @connection.create_channel(nil, @opts[:channel_options][:consumer_pool_size]).tap do |ch| ch.prefetch(@opts[:channel_options][:prefetch]) end end |
#local_connection? ⇒ Boolean
22 23 24 |
# File 'lib/pika_que/broker.rb', line 22 def local_connection? @opts[:connection_options] || @processor.nil? end |
#queue(queue_name, queue_opts = {}) ⇒ Object
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/pika_que/broker.rb', line 26 def queue(queue_name, queue_opts = {}) begin queue = channel.queue(queue_name, queue_opts) routing_key = queue_opts[:routing_key] || queue_name routing_keys = [routing_key, *queue_opts[:routing_keys]] routing_keys.each do |key| queue.bind(exchange, routing_key: key) end queue rescue => e PikaQue.logger.fatal e. raise SetupError.new e. end end |
#start ⇒ Object
13 14 15 16 |
# File 'lib/pika_que/broker.rb', line 13 def start @connection ||= @opts[:connection_options] ? PikaQue::Connection.create(@opts[:connection_options]) : PikaQue.connection @connection.ensure_connection end |
#stop ⇒ Object
18 19 20 |
# File 'lib/pika_que/broker.rb', line 18 def stop @connection.disconnect! if local_connection? end |