Class: Qswarm::Broker
- Inherits:
-
Object
- Object
- Qswarm::Broker
- Extended by:
- DSL
- Includes:
- Loggable
- Defined in:
- lib/qswarm/broker.rb
Constant Summary collapse
- @@connection =
{}
Instance Method Summary collapse
-
#channel(name, routing_key = '') ⇒ Object
ruby-amqp currently limits to 1 consumer per queue (to be fixed in future) so can’t pool channels.
- #connection ⇒ Object
- #exchange(channel = nil) ⇒ Object
-
#initialize(name, &block) ⇒ Broker
constructor
A new instance of Broker.
- #queue(name, routing_key = '', args = nil) ⇒ Object
- #to_s ⇒ Object
Methods included from DSL
Methods included from Loggable
Constructor Details
#initialize(name, &block) ⇒ Broker
Returns a new instance of Broker.
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/qswarm/broker.rb', line 15 def initialize(name, &block) @name = name # Set some defaults @host = 'localhost' @port = 5672 @user = 'guest' @pass = 'guest' @vhost = '' @exchange_type = :direct @exchange_name = '' @durable = true @prefetch = nil # if block.arity == 1 # yield self # else # instance_eval &block # end self.instance_eval(&block) @queues = {} @channels = {} @exchange = nil Signal.trap("INT") do @@connection["#{@host}:#{@port}#{@vhost}"].close do EM.stop { exit } end end end |
Instance Method Details
#channel(name, routing_key = '') ⇒ Object
ruby-amqp currently limits to 1 consumer per queue (to be fixed in future) so can’t pool channels
67 68 69 70 71 72 73 74 75 76 |
# File 'lib/qswarm/broker.rb', line 67 def channel name, routing_key = '' @channels["#{name}/#{routing_key}"] ||= begin logger.debug "Opening channel for #{name}/#{routing_key}" @channels["#{name}/#{routing_key}"] = AMQP::Channel.new(connection, AMQP::Channel.next_channel_id, :auto_recovery => true) do |c| @channels["#{name}/#{routing_key}"].on_error do |channel, channel_close| logger.error "[channel.close] Reply code = #{channel_close.reply_code}, reply text = #{channel_close.reply_text}" end end end end |
#connection ⇒ Object
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/qswarm/broker.rb', line 78 def connection # Pool connections at the class level @@connection["#{@host}:#{@port}#{@vhost}"] ||= begin logger.debug "Connecting to AMQP broker at #{self.to_s}" @@connection["#{@host}:#{@port}#{@vhost}"] = AMQP.connect(self.to_s, :heartbeat => 30, :on_tcp_connection_failure => Proc.new { |settings| logger.error "AMQP initial connection failure to #{settings[:host]}:#{settings[:port]}#{settings[:vhost]}" EM.stop }, :on_possible_authentication_failure => Proc.new { |settings| logger.error "AMQP initial authentication failed for #{settings[:host]}:#{settings[:port]}#{settings[:vhost]}" EM.stop } ) do |c| @@connection["#{@host}:#{@port}#{@vhost}"].on_recovery do |connection| logger.debug "Recovered from AMQP network failure" end @@connection["#{@host}:#{@port}#{@vhost}"].on_connection_interruption do |connection| # reconnect in 10 seconds logger.error "AMQP connection interruption, reconnecting in 10s" connection.reconnect(false, 10) end # Force reconnect on heartbeat loss to cope with our funny firewall issues @@connection["#{@host}:#{@port}#{@vhost}"].on_skipped_heartbeats do |connection, settings| logger.error "Skipped heartbeats detected" end @@connection["#{@host}:#{@port}#{@vhost}"].on_error do |connection, connection_close| logger.error "AMQP connection has been closed. Reply code = #{connection_close.reply_code}, reply text = #{connection_close.reply_text}" if connection_close.reply_code == 320 logger.error "Set a 30s reconnection timer" # every 30 seconds connection.periodically_reconnect(30) end end end end end |
#exchange(channel = nil) ⇒ Object
55 56 57 58 59 60 61 62 63 64 |
# File 'lib/qswarm/broker.rb', line 55 def exchange(channel = nil) @exchange ||= begin @exchange = AMQP::Exchange.new(channel ||= AMQP::Channel.new(connection, :auto_recovery => true), @exchange_type, @exchange_name, :durable => @durable) do |exchange| logger.debug "Declared #{@exchange_type} exchange #{@vhost}/#{@exchange_name}" @exchange.on_return do |basic_return, , payload| logger.error "#{payload} was returned! reply_code = #{basic_return.reply_code}, reply_text = #{basic_return.reply_text}" end end end end |
#queue(name, routing_key = '', args = nil) ⇒ Object
48 49 50 51 52 53 |
# File 'lib/qswarm/broker.rb', line 48 def queue name, routing_key = '', args = nil @queues["#{name}/#{routing_key}"] ||= begin logger.debug "Binding queue #{name}/#{routing_key}" @queues["#{name}/#{routing_key}"] = channel(name, routing_key).queue(name, args).bind(exchange(channel(name, routing_key)), :routing_key => routing_key) end end |
#to_s ⇒ Object
114 115 116 |
# File 'lib/qswarm/broker.rb', line 114 def to_s "amqp://#{@user}:#{@pass}@#{@host}:#{@port}/#{CGI.escape(@vhost)}" end |