Class: Banter::Server::RabbitMQSubscriber

Inherits:
Object
  • Object
show all
Defined in:
lib/banter/server/rabbit_mq_subscriber.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(routing_key, queue_name, ttl, durable = true, pool_size = 100, topic = Banter::Configuration.topic_prefix) ⇒ RabbitMQSubscriber

Returns a new instance of RabbitMQSubscriber.



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/banter/server/rabbit_mq_subscriber.rb', line 10

def initialize(routing_key, queue_name, ttl, durable = true, pool_size = 100, topic = Banter::Configuration.topic_prefix)
  @initial_key = routing_key
  @durable     = durable
  @topic       = topic
  @ttl         = ttl
  @pool_size   = pool_size

  if @initial_key.present?
    @routing_key = "#{@topic}.#{@initial_key}.#"
  else
    @routing_key = "#{@topic}.#"
  end

  @queue_name = queue_name
end

Instance Attribute Details

#channelObject (readonly)

Returns the value of attribute channel.



7
8
9
# File 'lib/banter/server/rabbit_mq_subscriber.rb', line 7

def channel
  @channel
end

#exchangeObject (readonly)

Returns the value of attribute exchange.



6
7
8
# File 'lib/banter/server/rabbit_mq_subscriber.rb', line 6

def exchange
  @exchange
end

#listenerObject (readonly)

Returns the value of attribute listener.



5
6
7
# File 'lib/banter/server/rabbit_mq_subscriber.rb', line 5

def listener
  @listener
end

#ttlObject (readonly)

Returns the value of attribute ttl.



8
9
10
# File 'lib/banter/server/rabbit_mq_subscriber.rb', line 8

def ttl
  @ttl
end

Instance Method Details

#start(blocking = false) ⇒ Object

pass in a lambda for this method to work. We might only want to expose the content instead of all 3 chunks.



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/banter/server/rabbit_mq_subscriber.rb', line 28

def start(blocking=false)
  @connection = Bunny.new(Configuration.connection)
  begin
    @connection.start
  rescue => e
    Airbrake.notify(e, params: { message: e.message, what_happened: "RabbitMQ unreachable!" }, environment_name: ENV['RAILS_ENV'])
    raise e
  end

  @channel  = @connection.create_channel
  @exchange = @channel.topic(@topic, :durable => @durable, :auto_delete => false)

  queue_arguments                           = {}
  queue_arguments["x-dead-letter-exchange"] = Configuration.dead_letter_queue if Configuration.dead_letter_queue.present?
  queue_arguments["x-message-ttl"]          = ttl * 1000

  @channel.basic_qos(@pool_size)

  rabbit_queue = @channel.queue(@queue_name, durable: @durable, exclusive: false, arguments: queue_arguments)
  @listener                                 = rabbit_queue.bind(@exchange, routing_key: @routing_key, exclusive: false)

  # Parameters for subscribe that might be useful:
  # :block=>true - Used for long running consumer applications.  (backend servers?)

  @consumer = @listener.subscribe(consumer_tag: @queue_name,  manual_ack: true, block: blocking)
  # @consumer = @listener.subscribe(consumer_tag: @queue_name, block: blocking)

  @consumer.on_delivery do |delivery_info, properties, contents|
    Banter::RabbitLogger.log(Logger::DEBUG, "Message delivery with contents: #{contents}")
    if delivery_info[:redelivered]
      e = StandardError.new("PubSub Message redelivery")
      Airbrake.notify(e, params: { info: delivery_info, props: properties, contents: contents }, environment_name: ENV['RAILS_ENV'], backtrace: caller)
    end
    message = ::Banter::Message.new.parse(contents)
    Banter::RabbitLogger.log_receive(delivery_info[:routing_key], message)
    yield delivery_info, properties, message
    Banter::RabbitLogger.log_complete(delivery_info[:routing_key], message)
    Banter::RabbitLogger.log(Logger::DEBUG, "Message acknowledged with tag #{delivery_info.delivery_tag}")
    # Need to acknowledge the message for the next message to come down.
    @channel.ack(delivery_info.delivery_tag)
    true
  end
end

#teardownObject



72
73
74
75
76
77
78
79
# File 'lib/banter/server/rabbit_mq_subscriber.rb', line 72

def teardown
  begin
    @connection.close if @connection.present?
    @consumer.cancel if @consumer.present?
  rescue => e
    Airbrake.notify(e, params: { error: e.message }, environment_name: ENV['RAILS_ENV'], backtrace: caller)
  end
end