Module: Promiscuous::AMQP::RubyAMQP
- Defined in:
- lib/promiscuous/amqp/ruby-amqp.rb
Class Method Summary collapse
- .build_connection_options(options) ⇒ Object
- .close ⇒ Object
- .configure(options) ⇒ Object
- .publish(msg) ⇒ Object
- .subscribe(options = {}, &block) ⇒ Object
Class Method Details
.build_connection_options(options) ⇒ Object
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# File 'lib/promiscuous/amqp/ruby-amqp.rb', line 13 def self.() if [:server_uri] uri = URI.parse([:server_uri]) raise "Please use amqp://user:password@host:port/vhost" if uri.scheme != 'amqp' { :host => uri.host, :port => uri.port, :scheme => uri.scheme, :user => uri.user, :pass => uri.password, :vhost => uri.path.empty? ? "/" : uri.path, } end end |
.close ⇒ Object
48 49 50 |
# File 'lib/promiscuous/amqp/ruby-amqp.rb', line 48 def self.close channel.close end |
.configure(options) ⇒ Object
6 7 8 9 10 11 |
# File 'lib/promiscuous/amqp/ruby-amqp.rb', line 6 def self.configure() require 'amqp' connection = ::AMQP.connect(()) self.channel = ::AMQP::Channel.new(connection) self. = [:queue_options] || {} end |
.publish(msg) ⇒ Object
42 43 44 45 46 |
# File 'lib/promiscuous/amqp/ruby-amqp.rb', line 42 def self.publish(msg) AMQP.info "[publish] #{msg[:key]} -> #{msg[:payload]}" exchange = channel.topic('promiscuous', :durable => true) exchange.publish(msg[:payload], :routing_key => msg[:key], :persistent => true) end |
.subscribe(options = {}, &block) ⇒ Object
29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/promiscuous/amqp/ruby-amqp.rb', line 29 def self.subscribe(={}, &block) queue_name = [:queue_name] bindings = [:bindings] queue = self.channel.queue(queue_name, self.) exchange = channel.topic('promiscuous', :durable => true) bindings.each do |binding| queue.bind(exchange, :routing_key => binding) AMQP.info "[bind] #{queue_name} -> #{binding}" end queue.subscribe(:ack => true, &block) end |