Class: Qswarm::Broker

Inherits:
Object
  • Object
show all
Extended by:
DSL
Includes:
Loggable
Defined in:
lib/qswarm/broker.rb

Constant Summary collapse

@@connection =
{}

Instance Method Summary collapse

Methods included from DSL

dsl_accessor

Methods included from Loggable

#logger, logger

Constructor Details

#initialize(name, &block) ⇒ Broker

Returns a new instance of Broker.



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/qswarm/broker.rb', line 15

def initialize(name, &block)
  @name          = name

  # Set some defaults
  @host          = 'localhost'
  @port          = 5672
  @user          = 'guest'
  @pass          = 'guest'
  @vhost         = ''
  @exchange_type = :direct
  @exchange_name = ''
  @durable       = true
  @prefetch      = nil

  # if block.arity == 1
  #    yield self
  #  else
  #    instance_eval &block
  #  end

  self.instance_eval(&block)

  @queues        = {}
  @channels      = {}
  @exchange      = nil

  Signal.trap("INT") do
    @@connection["#{@host}:#{@port}#{@vhost}"].close do
      EM.stop { exit }
    end
  end
end

Instance Method Details

#channel(name, routing_key = '') ⇒ Object

ruby-amqp currently limits to 1 consumer per queue (to be fixed in future) so can’t pool channels



67
68
69
70
71
72
73
74
75
76
# File 'lib/qswarm/broker.rb', line 67

def channel name, routing_key = ''
  @channels["#{name}/#{routing_key}"] ||= begin
    logger.debug "Opening channel for #{name}/#{routing_key}"
    @channels["#{name}/#{routing_key}"] = AMQP::Channel.new(connection, AMQP::Channel.next_channel_id, :auto_recovery => true) do |c|
      @channels["#{name}/#{routing_key}"].on_error do |channel, channel_close|
        logger.error "[channel.close] Reply code = #{channel_close.reply_code}, reply text = #{channel_close.reply_text}"
      end
    end
  end
end

#connectionObject



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/qswarm/broker.rb', line 78

def connection
  # Pool connections at the class level
  @@connection["#{@host}:#{@port}#{@vhost}"] ||= begin
    logger.debug "Connecting to AMQP broker at #{self.to_s}"
    @@connection["#{@host}:#{@port}#{@vhost}"] = AMQP.connect(self.to_s, :heartbeat => 30, :on_tcp_connection_failure => Proc.new { |settings|
        logger.error "AMQP initial connection failure to #{settings[:host]}:#{settings[:port]}#{settings[:vhost]}"
        EM.stop
      }, :on_possible_authentication_failure => Proc.new { |settings|
        logger.error "AMQP initial authentication failed for #{settings[:host]}:#{settings[:port]}#{settings[:vhost]}"
        EM.stop
      }
    ) do |c|
      @@connection["#{@host}:#{@port}#{@vhost}"].on_recovery do |connection|
        logger.debug "Recovered from AMQP network failure"
      end
      @@connection["#{@host}:#{@port}#{@vhost}"].on_connection_interruption do |connection|
        # reconnect in 10 seconds
        logger.error "AMQP connection interruption, reconnecting in 10s"
        connection.reconnect(false, 10)
      end
      # Force reconnect on heartbeat loss to cope with our funny firewall issues
      @@connection["#{@host}:#{@port}#{@vhost}"].on_skipped_heartbeats do |connection, settings|
        logger.error "Skipped heartbeats detected"
      end
      @@connection["#{@host}:#{@port}#{@vhost}"].on_error do |connection, connection_close|
        logger.error "AMQP connection has been closed. Reply code = #{connection_close.reply_code}, reply text = #{connection_close.reply_text}"
        if connection_close.reply_code == 320
          logger.error "Set a 30s reconnection timer"
          # every 30 seconds
          connection.periodically_reconnect(30)
        end
      end
    end
  end
end

#exchange(channel = nil) ⇒ Object



55
56
57
58
59
60
61
62
63
64
# File 'lib/qswarm/broker.rb', line 55

def exchange(channel = nil)
  @exchange ||= begin
    @exchange = AMQP::Exchange.new(channel ||= AMQP::Channel.new(connection, :auto_recovery => true), @exchange_type, @exchange_name, :durable => @durable) do |exchange|
      logger.debug "Declared #{@exchange_type} exchange #{@vhost}/#{@exchange_name}"
      @exchange.on_return do |basic_return, , payload|
        logger.error "#{payload} was returned! reply_code = #{basic_return.reply_code}, reply_text = #{basic_return.reply_text}"
      end
    end
  end
end

#queue(name, routing_key = '', args = nil) ⇒ Object



48
49
50
51
52
53
# File 'lib/qswarm/broker.rb', line 48

def queue name, routing_key = '', args = nil
  @queues["#{name}/#{routing_key}"] ||= begin
    logger.debug "Binding queue #{name}/#{routing_key}"
    @queues["#{name}/#{routing_key}"] = channel(name, routing_key).queue(name, args).bind(exchange(channel(name, routing_key)), :routing_key => routing_key)
  end
end

#to_sObject



114
115
116
# File 'lib/qswarm/broker.rb', line 114

def to_s
  "amqp://#{@user}:#{@pass}@#{@host}:#{@port}/#{CGI.escape(@vhost)}"
end