Class: Banter::Server::RabbitMQSubscriber
- Inherits:
-
Object
- Object
- Banter::Server::RabbitMQSubscriber
- Defined in:
- lib/banter/server/rabbit_mq_subscriber.rb
Instance Attribute Summary collapse
-
#channel ⇒ Object
readonly
Returns the value of attribute channel.
-
#exchange ⇒ Object
readonly
Returns the value of attribute exchange.
-
#listener ⇒ Object
readonly
Returns the value of attribute listener.
-
#ttl ⇒ Object
readonly
Returns the value of attribute ttl.
Instance Method Summary collapse
-
#initialize(routing_key, queue_name, ttl, durable = true, pool_size = 100, topic = Banter::Configuration.topic_prefix) ⇒ RabbitMQSubscriber
constructor
A new instance of RabbitMQSubscriber.
-
#start(blocking = false) ⇒ Object
pass in a lambda for this method to work.
- #teardown ⇒ Object
Constructor Details
#initialize(routing_key, queue_name, ttl, durable = true, pool_size = 100, topic = Banter::Configuration.topic_prefix) ⇒ RabbitMQSubscriber
Returns a new instance of RabbitMQSubscriber.
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
# File 'lib/banter/server/rabbit_mq_subscriber.rb', line 10 def initialize(routing_key, queue_name, ttl, durable = true, pool_size = 100, topic = Banter::Configuration.topic_prefix) @initial_key = routing_key @durable = durable @topic = topic @ttl = ttl @pool_size = pool_size if @initial_key.present? @routing_key = "#{@topic}.#{@initial_key}.#" else @routing_key = "#{@topic}.#" end @queue_name = queue_name end |
Instance Attribute Details
#channel ⇒ Object (readonly)
Returns the value of attribute channel.
7 8 9 |
# File 'lib/banter/server/rabbit_mq_subscriber.rb', line 7 def channel @channel end |
#exchange ⇒ Object (readonly)
Returns the value of attribute exchange.
6 7 8 |
# File 'lib/banter/server/rabbit_mq_subscriber.rb', line 6 def exchange @exchange end |
#listener ⇒ Object (readonly)
Returns the value of attribute listener.
5 6 7 |
# File 'lib/banter/server/rabbit_mq_subscriber.rb', line 5 def listener @listener end |
#ttl ⇒ Object (readonly)
Returns the value of attribute ttl.
8 9 10 |
# File 'lib/banter/server/rabbit_mq_subscriber.rb', line 8 def ttl @ttl end |
Instance Method Details
#start(blocking = false) ⇒ Object
pass in a lambda for this method to work. We might only want to expose the content instead of all 3 chunks.
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/banter/server/rabbit_mq_subscriber.rb', line 28 def start(blocking=false) @connection = Bunny.new(Configuration.connection) begin @connection.start rescue => e Airbrake.notify(e, params: { message: e., what_happened: "RabbitMQ unreachable!" }, environment_name: ENV['RAILS_ENV']) raise e end @channel = @connection.create_channel @exchange = @channel.topic(@topic, :durable => @durable, :auto_delete => false) queue_arguments = {} queue_arguments["x-dead-letter-exchange"] = Configuration.dead_letter_queue if Configuration.dead_letter_queue.present? queue_arguments["x-message-ttl"] = ttl * 1000 @channel.basic_qos(@pool_size) rabbit_queue = @channel.queue(@queue_name, durable: @durable, exclusive: false, arguments: queue_arguments) @listener = rabbit_queue.bind(@exchange, routing_key: @routing_key, exclusive: false) # Parameters for subscribe that might be useful: # :block=>true - Used for long running consumer applications. (backend servers?) @consumer = @listener.subscribe(consumer_tag: @queue_name, manual_ack: true, block: blocking) # @consumer = @listener.subscribe(consumer_tag: @queue_name, block: blocking) @consumer.on_delivery do |delivery_info, properties, contents| Banter::RabbitLogger.log(Logger::DEBUG, "Message delivery with contents: #{contents}") if delivery_info[:redelivered] e = StandardError.new("PubSub Message redelivery") Airbrake.notify(e, params: { info: delivery_info, props: properties, contents: contents }, environment_name: ENV['RAILS_ENV'], backtrace: caller) end = ::Banter::Message.new.parse(contents) Banter::RabbitLogger.log_receive(delivery_info[:routing_key], ) yield delivery_info, properties, Banter::RabbitLogger.log_complete(delivery_info[:routing_key], ) Banter::RabbitLogger.log(Logger::DEBUG, "Message acknowledged with tag #{delivery_info.delivery_tag}") # Need to acknowledge the message for the next message to come down. @channel.ack(delivery_info.delivery_tag) true end end |
#teardown ⇒ Object
72 73 74 75 76 77 78 79 |
# File 'lib/banter/server/rabbit_mq_subscriber.rb', line 72 def teardown begin @consumer.cancel if @consumer.present? @connection.close if @connection.present? rescue => e Airbrake.notify(e, params: { error: e. }, environment_name: ENV['RAILS_ENV'], backtrace: caller) end end |