Module: Promiscuous::AMQP::Bunny::Subscriber
- Included in:
- HotBunnies::Subscriber
- Defined in:
- lib/promiscuous/amqp/bunny.rb
Defined Under Namespace
Classes: MetaData
Instance Method Summary collapse
- #ack_message(tag) ⇒ Object
- #disconnect ⇒ Object
- #postpone_message ⇒ Object
- #recover ⇒ Object
- #subscribe(options = {}, &block) ⇒ Object
- #subscribe_queue(queue, &block) ⇒ Object
Instance Method Details
#ack_message(tag) ⇒ Object
151 152 153 |
# File 'lib/promiscuous/amqp/bunny.rb', line 151 def (tag) @lock.synchronize { @channel.ack(tag) } if @channel end |
#disconnect ⇒ Object
168 169 170 |
# File 'lib/promiscuous/amqp/bunny.rb', line 168 def disconnect @lock.synchronize { @connection.stop; @channel = nil } end |
#postpone_message ⇒ Object
155 156 157 158 159 160 161 162 |
# File 'lib/promiscuous/amqp/bunny.rb', line 155 def # Not using nacks, because the message gets sent back right away so this # is a no-op. # TODO: Even though the prefetch window is set to 10mil we should still # check that the unacked messages doesn't exceed this limit and increase # the prefetch window. end |
#recover ⇒ Object
164 165 166 |
# File 'lib/promiscuous/amqp/bunny.rb', line 164 def recover @lock.synchronize { @channel.basic_recover(true) } if @channel end |
#subscribe(options = {}, &block) ⇒ Object
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/promiscuous/amqp/bunny.rb', line 110 def subscribe(={}, &block) @lock = Mutex.new @prefetch = Promiscuous::Config.prefetch = { :url => Promiscuous::Config.subscriber_amqp_url, :exchanges => [:bindings].keys, :prefetch => @prefetch } @connection, @channel, exchanges = Promiscuous::AMQP.new_connection() @queue = @channel.queue(Promiscuous::Config.queue_name, Promiscuous::Config.) exchanges.zip([:bindings].values).each do |exchange, bindings| bindings.each do |binding| @queue.bind(exchange, :routing_key => binding) Promiscuous.debug "[bind] #{exchange.name}/#{binding}/#{Promiscuous::Config.queue_name}" end end @subscription = subscribe_queue(@queue, &block) end |