Module: Promiscuous::AMQP::Bunny::Subscriber

Included in:
HotBunnies::Subscriber
Defined in:
lib/promiscuous/amqp/bunny.rb

Defined Under Namespace

Classes: MetaData

Instance Method Summary collapse

Instance Method Details

#ack_message(tag) ⇒ Object



151
152
153
# File 'lib/promiscuous/amqp/bunny.rb', line 151

def ack_message(tag)
  @lock.synchronize { @channel.ack(tag) } if @channel
end

#disconnectObject



168
169
170
# File 'lib/promiscuous/amqp/bunny.rb', line 168

def disconnect
  @lock.synchronize { @connection.stop; @channel = nil }
end

#postpone_messageObject



155
156
157
158
159
160
161
162
# File 'lib/promiscuous/amqp/bunny.rb', line 155

def postpone_message
  # Not using nacks, because the message gets sent back right away so this
  # is a no-op.

  # TODO: Even though the prefetch window is set to 10mil we should still
  # check that the unacked messages doesn't exceed this limit and increase
  # the prefetch window.
end

#recoverObject



164
165
166
# File 'lib/promiscuous/amqp/bunny.rb', line 164

def recover
  @lock.synchronize { @channel.basic_recover(true) } if @channel
end

#subscribe(options = {}, &block) ⇒ Object



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/promiscuous/amqp/bunny.rb', line 110

def subscribe(options={}, &block)
  @lock = Mutex.new
  @prefetch = Promiscuous::Config.prefetch

  connection_options = { :url       => Promiscuous::Config.subscriber_amqp_url,
                         :exchanges => options[:bindings].keys,
                         :prefetch  => @prefetch }
  @connection, @channel, exchanges = Promiscuous::AMQP.new_connection(connection_options)

  @queue = @channel.queue(Promiscuous::Config.queue_name, Promiscuous::Config.queue_options)
  exchanges.zip(options[:bindings].values).each do |exchange, bindings|
    bindings.each do |binding|
      @queue.bind(exchange, :routing_key => binding)
      Promiscuous.debug "[bind] #{exchange.name}/#{binding}/#{Promiscuous::Config.queue_name}"
    end
  end

  @subscription = subscribe_queue(@queue, &block)
end

#subscribe_queue(queue, &block) ⇒ Object



130
131
132
133
134
# File 'lib/promiscuous/amqp/bunny.rb', line 130

def subscribe_queue(queue, &block)
  queue.subscribe(:ack => true) do |delivery_info, , payload|
    block.call(MetaData.new(self, delivery_info), payload)
  end
end