Module: Promiscuous::AMQP::RubyAMQP

Defined in:
lib/promiscuous/amqp/ruby-amqp.rb

Class Method Summary collapse

Class Method Details

.build_connection_options(options) ⇒ Object



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/promiscuous/amqp/ruby-amqp.rb', line 13

def self.build_connection_options(options)
  if options[:server_uri]
    uri = URI.parse(options[:server_uri])
    raise "Please use amqp://user:password@host:port/vhost" if uri.scheme != 'amqp'

    {
      :host => uri.host,
      :port => uri.port,
      :scheme => uri.scheme,
      :user => uri.user,
      :pass => uri.password,
      :vhost => uri.path.empty? ? "/" : uri.path,
   }
  end
end

.closeObject



48
49
50
# File 'lib/promiscuous/amqp/ruby-amqp.rb', line 48

def self.close
  channel.close
end

.configure(options) ⇒ Object



6
7
8
9
10
11
# File 'lib/promiscuous/amqp/ruby-amqp.rb', line 6

def self.configure(options)
  require 'amqp'
  connection = ::AMQP.connect(build_connection_options(options))
  self.channel = ::AMQP::Channel.new(connection)
  self.queue_options = options[:queue_options] || {}
end

.publish(msg) ⇒ Object



42
43
44
45
46
# File 'lib/promiscuous/amqp/ruby-amqp.rb', line 42

def self.publish(msg)
  AMQP.info "[publish] #{msg[:key]} -> #{msg[:payload]}"
  exchange = channel.topic('promiscuous', :durable => true)
  exchange.publish(msg[:payload], :routing_key => msg[:key], :persistent => true)
end

.subscribe(options = {}, &block) ⇒ Object



29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/promiscuous/amqp/ruby-amqp.rb', line 29

def self.subscribe(options={}, &block)
  queue_name = options[:queue_name]
  bindings   = options[:bindings]

  queue = self.channel.queue(queue_name, self.queue_options)
  exchange = channel.topic('promiscuous', :durable => true)
  bindings.each do |binding|
    queue.bind(exchange, :routing_key => binding)
    AMQP.info "[bind] #{queue_name} -> #{binding}"
  end
  queue.subscribe(:ack => true, &block)
end