Class: Promiscuous::AMQP::Bunny
- Inherits:
-
Object
- Object
- Promiscuous::AMQP::Bunny
- Defined in:
- lib/promiscuous/amqp/bunny.rb
Direct Known Subclasses
Defined Under Namespace
Modules: Subscriber
Instance Attribute Summary collapse
-
#callback_mapping ⇒ Object
Returns the value of attribute callback_mapping.
-
#connection ⇒ Object
Returns the value of attribute connection.
-
#connection_lock ⇒ Object
Returns the value of attribute connection_lock.
Class Method Summary collapse
Instance Method Summary collapse
- #connect ⇒ Object
- #connected? ⇒ Boolean
- #disconnect ⇒ Object
-
#initialize ⇒ Bunny
constructor
A new instance of Bunny.
- #initialize_driver ⇒ Object
- #new_connection(options = {}) ⇒ Object
- #on_confirm(tag, multiple, nack = false) ⇒ Object
- #publish(options = {}) ⇒ Object
- #raw_confirm_select(channel, &callback) ⇒ Object
- #raw_new_connection(options = {}) ⇒ Object
- #raw_publish(options) ⇒ Object
Constructor Details
#initialize ⇒ Bunny
Returns a new instance of Bunny.
23 24 25 26 27 28 |
# File 'lib/promiscuous/amqp/bunny.rb', line 23 def initialize initialize_driver # The bunny socket doesn't like when multiple threads access to it apparently @connection_lock = Mutex.new @callback_mapping = {} end |
Instance Attribute Details
#callback_mapping ⇒ Object
Returns the value of attribute callback_mapping.
16 17 18 |
# File 'lib/promiscuous/amqp/bunny.rb', line 16 def callback_mapping @callback_mapping end |
#connection ⇒ Object
Returns the value of attribute connection.
16 17 18 |
# File 'lib/promiscuous/amqp/bunny.rb', line 16 def connection @connection end |
#connection_lock ⇒ Object
Returns the value of attribute connection_lock.
16 17 18 |
# File 'lib/promiscuous/amqp/bunny.rb', line 16 def connection_lock @connection_lock end |
Class Method Details
.hijack_bunny ⇒ Object
2 3 4 5 6 7 8 9 10 11 12 13 14 |
# File 'lib/promiscuous/amqp/bunny.rb', line 2 def self.hijack_bunny return if @bunny_hijacked ::Bunny::Session.class_eval do alias_method :handle_network_failure_without_promiscuous, :handle_network_failure def handle_network_failure(e) Promiscuous.warn "[amqp] #{e}. Reconnecting..." Promiscuous::Config.error_notifier.call(e) handle_network_failure_without_promiscuous(e) end end @bunny_hijacked = true end |
Instance Method Details
#connect ⇒ Object
59 60 61 62 63 64 |
# File 'lib/promiscuous/amqp/bunny.rb', line 59 def connect = { :url => Promiscuous::Config.publisher_amqp_url, :exchange => Promiscuous::Config.publisher_exchange, :confirm => true } @connection, @channel, @exchange = new_connection() end |
#connected? ⇒ Boolean
74 75 76 |
# File 'lib/promiscuous/amqp/bunny.rb', line 74 def connected? !!@connection.try(:connected?) end |
#disconnect ⇒ Object
66 67 68 69 70 71 72 |
# File 'lib/promiscuous/amqp/bunny.rb', line 66 def disconnect @connection_lock.synchronize do return unless connected? @connection.stop @connection = @channel = nil end end |
#initialize_driver ⇒ Object
18 19 20 21 |
# File 'lib/promiscuous/amqp/bunny.rb', line 18 def initialize_driver require 'bunny' self.class.hijack_bunny end |
#new_connection(options = {}) ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/promiscuous/amqp/bunny.rb', line 42 def new_connection(={}) connection = raw_new_connection() channel = connection.create_channel channel.basic_qos([:prefetch]) if [:prefetch] raw_confirm_select(channel, &method(:on_confirm)) if [:confirm] if [:exchanges] exchanges = [:exchanges].map do |exchange_name| channel.exchange(exchange_name, :type => :topic, :durable => true) end [connection, channel, exchanges] else exchange = channel.exchange([:exchange], :type => :topic, :durable => true) [connection, channel, exchange] end end |
#on_confirm(tag, multiple, nack = false) ⇒ Object
97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/promiscuous/amqp/bunny.rb', line 97 def on_confirm(tag, multiple, nack=false) if multiple cbs = @callback_mapping.keys .select { |k| k <= tag } .map { |k| @callback_mapping.delete(k) } cbs.each(&:call) unless nack else cb = @callback_mapping.delete(tag) cb.try(:call) unless nack end end |
#publish(options = {}) ⇒ Object
82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/promiscuous/amqp/bunny.rb', line 82 def publish(={}) [:exchange] ||= @exchange Promiscuous.debug "[publish] #{[:exchange].name}/#{[:key]} #{[:payload]}" @connection_lock.synchronize do tag = @channel.next_publish_seq_no if [:on_confirm] raw_publish() @callback_mapping[tag] = [:on_confirm] if [:on_confirm] end rescue Exception => e e = Promiscuous::Error::Publisher.new(e, :payload => [:payload]) Promiscuous.warn "[publish] #{e}\n#{e.backtrace.join("\n")}" Promiscuous::Config.error_notifier.call(e) end |
#raw_confirm_select(channel, &callback) ⇒ Object
38 39 40 |
# File 'lib/promiscuous/amqp/bunny.rb', line 38 def raw_confirm_select(channel, &callback) channel.confirm_select(callback) end |
#raw_new_connection(options = {}) ⇒ Object
30 31 32 33 34 35 36 |
# File 'lib/promiscuous/amqp/bunny.rb', line 30 def raw_new_connection(={}) connection = ::Bunny.new([:url], :heartbeat_interval => Promiscuous::Config.heartbeat, :socket_timeout => Promiscuous::Config.socket_timeout, :connect_timeout => Promiscuous::Config.socket_timeout) connection.start connection end |
#raw_publish(options) ⇒ Object
78 79 80 |
# File 'lib/promiscuous/amqp/bunny.rb', line 78 def raw_publish() [:exchange].publish([:payload], :key => [:key], :persistent => true) end |