Module: Promiscuous::AMQP::RubyAMQP

Defined in:
lib/promiscuous/amqp/rubyamqp.rb

Class Method Summary collapse

Class Method Details

.connectObject



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/promiscuous/amqp/rubyamqp.rb', line 6

def self.connect
  require 'amqp'

  amqp_options = if Promiscuous::Config.server_uri
    uri = URI.parse(Promiscuous::Config.server_uri)
    raise "Please use amqp://user:password@host:port/vhost" if uri.scheme != 'amqp'

    {
      :host   => uri.host,
      :port   => uri.port,
      :scheme => uri.scheme,
      :user   => uri.user,
      :pass   => uri.password,
      :vhost  => uri.path.empty? ? "/" : uri.path,
    }
  end

  connection = ::AMQP.connect(amqp_options)
  self.channel = ::AMQP::Channel.new(connection)
end

.disconnectObject



27
28
29
30
# File 'lib/promiscuous/amqp/rubyamqp.rb', line 27

def self.disconnect
  self.channel.close if self.channel
  self.channel = nil
end

.exchange(name) ⇒ Object



50
51
52
# File 'lib/promiscuous/amqp/rubyamqp.rb', line 50

def self.exchange(name)
  channel.topic(name, :durable => true)
end

.open_queue(options = {}, &block) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
# File 'lib/promiscuous/amqp/rubyamqp.rb', line 32

def self.open_queue(options={}, &block)
  queue_name = options[:queue_name]
  bindings   = options[:bindings]

  queue = self.channel.queue(queue_name, Promiscuous::Config.queue_options)
  bindings.each do |binding|
    queue.bind(exchange(options[:exchange_name]), :routing_key => binding)
    Promiscuous.info "[bind] #{queue_name} -> #{binding}"
  end
  block.call(queue) if block
end

.publish(options = {}) ⇒ Object



44
45
46
47
48
# File 'lib/promiscuous/amqp/rubyamqp.rb', line 44

def self.publish(options={})
  Promiscuous.info "[publish] (#{options[:exchange_name]}) #{options[:key]} -> #{options[:payload]}"
  exchange(options[:exchange_name]).
    publish(options[:payload], :routing_key => options[:key], :persistent => true)
end