Class: Promiscuous::AMQP::Bunny

Inherits:
Object
  • Object
show all
Defined in:
lib/promiscuous/amqp/bunny.rb

Direct Known Subclasses

HotBunnies

Defined Under Namespace

Modules: Subscriber

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeBunny

Returns a new instance of Bunny.



23
24
25
26
27
28
# File 'lib/promiscuous/amqp/bunny.rb', line 23

def initialize
  initialize_driver
  # The bunny socket doesn't like when multiple threads access to it apparently
  @connection_lock = Mutex.new
  @callback_mapping = {}
end

Instance Attribute Details

#callback_mappingObject

Returns the value of attribute callback_mapping.



16
17
18
# File 'lib/promiscuous/amqp/bunny.rb', line 16

def callback_mapping
  @callback_mapping
end

#connectionObject

Returns the value of attribute connection.



16
17
18
# File 'lib/promiscuous/amqp/bunny.rb', line 16

def connection
  @connection
end

#connection_lockObject

Returns the value of attribute connection_lock.



16
17
18
# File 'lib/promiscuous/amqp/bunny.rb', line 16

def connection_lock
  @connection_lock
end

Class Method Details

.hijack_bunnyObject



2
3
4
5
6
7
8
9
10
11
12
13
14
# File 'lib/promiscuous/amqp/bunny.rb', line 2

def self.hijack_bunny
  return if @bunny_hijacked
  ::Bunny::Session.class_eval do
    alias_method :handle_network_failure_without_promiscuous, :handle_network_failure

    def handle_network_failure(e)
      Promiscuous.warn "[amqp] #{e}. Reconnecting..."
      Promiscuous::Config.error_notifier.call(e)
      handle_network_failure_without_promiscuous(e)
    end
  end
  @bunny_hijacked = true
end

Instance Method Details

#connectObject



59
60
61
62
63
64
# File 'lib/promiscuous/amqp/bunny.rb', line 59

def connect
  connection_options = { :url      => Promiscuous::Config.publisher_amqp_url,
                         :exchange => Promiscuous::Config.publisher_exchange,
                         :confirm  => true }
  @connection, @channel, @exchange = new_connection(connection_options)
end

#connected?Boolean

Returns:

  • (Boolean)


74
75
76
# File 'lib/promiscuous/amqp/bunny.rb', line 74

def connected?
  !!@connection.try(:connected?)
end

#disconnectObject



66
67
68
69
70
71
72
# File 'lib/promiscuous/amqp/bunny.rb', line 66

def disconnect
  @connection_lock.synchronize do
    return unless connected?
    @connection.stop
    @connection = @channel = nil
  end
end

#initialize_driverObject



18
19
20
21
# File 'lib/promiscuous/amqp/bunny.rb', line 18

def initialize_driver
  require 'bunny'
  self.class.hijack_bunny
end

#new_connection(options = {}) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/promiscuous/amqp/bunny.rb', line 42

def new_connection(options={})
  connection = raw_new_connection(options)
  channel = connection.create_channel
  channel.basic_qos(options[:prefetch]) if options[:prefetch]
  raw_confirm_select(channel, &method(:on_confirm)) if options[:confirm]

  if options[:exchanges]
    exchanges = options[:exchanges].map do |exchange_name|
      channel.exchange(exchange_name, :type => :topic, :durable => true)
    end
    [connection, channel, exchanges]
  else
    exchange = channel.exchange(options[:exchange], :type => :topic, :durable => true)
    [connection, channel, exchange]
  end
end

#on_confirm(tag, multiple, nack = false) ⇒ Object



97
98
99
100
101
102
103
104
105
106
107
# File 'lib/promiscuous/amqp/bunny.rb', line 97

def on_confirm(tag, multiple, nack=false)
  if multiple
    cbs = @callback_mapping.keys
            .select { |k| k <= tag }
            .map    { |k| @callback_mapping.delete(k) }
    cbs.each(&:call) unless nack
  else
    cb = @callback_mapping.delete(tag)
    cb.try(:call) unless nack
  end
end

#publish(options = {}) ⇒ Object



82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/promiscuous/amqp/bunny.rb', line 82

def publish(options={})
  options[:exchange] ||= @exchange
  Promiscuous.debug "[publish] #{options[:exchange].name}/#{options[:key]} #{options[:payload]}"

  @connection_lock.synchronize do
    tag = @channel.next_publish_seq_no if options[:on_confirm]
    raw_publish(options)
    @callback_mapping[tag] = options[:on_confirm] if options[:on_confirm]
  end
rescue Exception => e
  e = Promiscuous::Error::Publisher.new(e, :payload => options[:payload])
  Promiscuous.warn "[publish] #{e}\n#{e.backtrace.join("\n")}"
  Promiscuous::Config.error_notifier.call(e)
end

#raw_confirm_select(channel, &callback) ⇒ Object



38
39
40
# File 'lib/promiscuous/amqp/bunny.rb', line 38

def raw_confirm_select(channel, &callback)
  channel.confirm_select(callback)
end

#raw_new_connection(options = {}) ⇒ Object



30
31
32
33
34
35
36
# File 'lib/promiscuous/amqp/bunny.rb', line 30

def raw_new_connection(options={})
  connection = ::Bunny.new(options[:url], :heartbeat_interval => Promiscuous::Config.heartbeat,
                                          :socket_timeout     => Promiscuous::Config.socket_timeout,
                                          :connect_timeout    => Promiscuous::Config.socket_timeout)
  connection.start
  connection
end

#raw_publish(options) ⇒ Object



78
79
80
# File 'lib/promiscuous/amqp/bunny.rb', line 78

def raw_publish(options)
  options[:exchange].publish(options[:payload], :key => options[:key], :persistent => true)
end