Class: Emque::Producing::Publisher::RabbitMq

Inherits:
Base
  • Object
show all
Defined in:
lib/emque/producing/publisher/rabbitmq.rb

Constant Summary collapse

CONN =
Bunny
.new(Emque::Producing.configuration.rabbitmq_options[:url])
.tap { |conn| conn.start }
CONFIRM_CHANNEL_POOL =
Queue.new.tap {
  |queue| queue << CONN.create_channel
}
CHANNEL_POOL =
Queue.new.tap { |queue| queue << CONN.create_channel }

Instance Method Summary collapse

Methods inherited from Base

#handle_error, #host_name

Instance Method Details

#get_channel(raise_on_failure) ⇒ Object



65
66
67
68
69
70
71
72
73
74
75
# File 'lib/emque/producing/publisher/rabbitmq.rb', line 65

def get_channel(raise_on_failure)
  begin
    if raise_on_failure
      ch = CONFIRM_CHANNEL_POOL.pop(true)
    else
      ch = CHANNEL_POOL.pop(true)
    end
  rescue ThreadError
    ch = CONN.create_channel
  end
end

#publish(topic, message_type, message, key = nil, raise_on_failure) ⇒ Object



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/emque/producing/publisher/rabbitmq.rb', line 21

def publish(topic, message_type, message, key = nil, raise_on_failure)
  ch = get_channel(raise_on_failure)

  ch.open if ch.closed?
  begin
    exchange = ch.fanout(topic, :durable => true, :auto_delete => false)

    ch.confirm_select if raise_on_failure
    sent = true

    exchange.on_return do |return_info, properties, content|
      Emque::Producing.logger.warn("App [#{properties[:app_id]}] message was returned from exchange [#{return_info[:exchange]}]")
      sent = false
    end

    exchange.publish(
      message,
      :mandatory => true,
      :persistent => true,
      :type => message_type,
      :app_id => Emque::Producing.configuration.app_name,
      :content_type => "application/json"
    )

    if raise_on_failure
      success = ch.wait_for_confirms
      unless success
        Emque::Producing.logger.warn("RabbitMQ Publisher: message was nacked")
        ch.nacked_set.each do |n|
          Emque::Producing.logger.warn("message id: #{n}")
        end
      end
    end

    return sent
  ensure
    if raise_on_failure
      CONFIRM_CHANNEL_POOL << ch unless ch.nil?
    else
      CHANNEL_POOL << ch unless ch.nil?
    end
  end
end