Class: Qswarm::Connections::Amqp

Inherits:
Qswarm::Connection show all
Includes:
DSL
Defined in:
lib/qswarm/connections/amqp.rb

Constant Summary collapse

@@connection =

dsl_accessor :name, :host, :port, :user, :pass, :vhost, :exchange_type, :exchange_name, :durable

{}

Instance Attribute Summary

Attributes inherited from Qswarm::Connection

#format

Instance Method Summary collapse

Methods included from DSL

#dsl_call, #dsl_load, included

Constructor Details

#initialize(agent, name, args, &block) ⇒ Amqp

Returns a new instance of Amqp.



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/qswarm/connections/amqp.rb', line 15

def initialize(agent, name, args, &block)
  # Set some defaults
  @host          = 'localhost'
  @port          = 5672
  @user          = 'guest'
  @pass          = 'guest'
  @vhost         = ''
  @durable       = true
  @prefetch      = args[:prefetch] || 0

  decode_uri(args[:uri]) if args[:uri]

  @queues        = {}
  @channels      = {}
  @exchange      = nil
  @instances     = nil

  @queue_args     = { :auto_delete => true, :durable => true, :exclusive => true }.merge! args[:queue_args] || {}
  @subscribe_args = { :exclusive => false, :ack => false }.merge! args[:subscribe_args] || {}
  @bind_args      = args[:bind_args] || {}
  @exchange_type  = args[:exchange_type] || :direct
  @exchange_name  = args[:exchange_name] || ''
  @exchange_args  = { :durable => true }.merge! args[:exchange_args] || {}
  @uuid           = UUID.generate if args[:uniq]
  @bind           = args[:bind]

  Signal.trap("INT") do
    @@connection["#{@host}:#{@port}/#{@vhost}"].close do
      EM.stop { exit }
    end
  end

  super
end

Instance Method Details

#ack?Boolean

Returns:

  • (Boolean)


124
125
126
# File 'lib/qswarm/connections/amqp.rb', line 124

def ack?
  @subscribe_args[:ack]
end

#channel(name, routing_key = '') ⇒ Object

ruby-amqp currently limits to 1 consumer per queue (to be fixed in future) so can’t pool channels



69
70
71
72
73
74
75
76
77
78
# File 'lib/qswarm/connections/amqp.rb', line 69

def channel(name, routing_key = '')
  @channels["#{name}/#{routing_key}"] ||= begin
    Qswarm.logger.debug "Opening channel for #{name}/#{routing_key}"
    @channels["#{name}/#{routing_key}"] = AMQP::Channel.new(connection, AMQP::Channel.next_channel_id, :auto_recovery => true, :prefetch => @prefetch) do |c|
      @channels["#{name}/#{routing_key}"].on_error do |channel, channel_close|
        Qswarm.logger.error "[channel.close] Reply code = #{channel_close.reply_code}, reply text = #{channel_close.reply_text}"
      end
    end
  end
end

#connectionObject



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/qswarm/connections/amqp.rb', line 80

def connection
  # Pool connections at the class level
  @@connection["#{@host}:#{@port}/#{@vhost}"] ||= begin
    Qswarm.logger.debug "Connecting to AMQP broker #{self.to_s}"
    @@connection["#{@host}:#{@port}/#{@vhost}"] = AMQP.connect(self.to_s, :heartbeat => 30, :on_tcp_connection_failure => Proc.new { |settings|
        Qswarm.logger.error "AMQP initial connection failure to #{settings[:host]}:#{settings[:port]}/#{settings[:vhost]}"
        EM.stop
      }, :on_possible_authentication_failure => Proc.new { |settings|
        Qswarm.logger.error "AMQP initial authentication failed for #{settings[:host]}:#{settings[:port]}/#{settings[:vhost]}"
        EM.stop
      }
    ) do |c|
      @@connection["#{@host}:#{@port}/#{@vhost}"].on_recovery do |connection|
        Qswarm.logger.debug "Recovered from AMQP network failure"
      end
      @@connection["#{@host}:#{@port}/#{@vhost}"].on_tcp_connection_loss do |connection|
        # reconnect in 10 seconds
        Qswarm.logger.error "AMQP TCP connection lost, reconnecting in 2s"
        connection.periodically_reconnect(2)
      end
      @@connection["#{@host}:#{@port}/#{@vhost}"].on_connection_interruption do |connection|
        Qswarm.logger.error "AMQP connection interruption"
      end
      # Force reconnect on heartbeat loss to cope with our funny firewall issues
      @@connection["#{@host}:#{@port}/#{@vhost}"].on_skipped_heartbeats do |connection, settings|
        Qswarm.logger.error "Skipped heartbeats detected"
      end
      @@connection["#{@host}:#{@port}/#{@vhost}"].on_error do |connection, connection_close|
        Qswarm.logger.error "AMQP connection has been closed. Reply code = #{connection_close.reply_code}, reply text = #{connection_close.reply_text}"
        if connection_close.reply_code == 320
          Qswarm.logger.error "Set a 30s reconnection timer"
          # every 30 seconds
          connection.periodically_reconnect(30)
        end
      end
      Qswarm.logger.debug "Connected to AMQP broker #{self.to_s}"
    end
  end
end

#decode_uri(uri) ⇒ Object



120
121
122
# File 'lib/qswarm/connections/amqp.rb', line 120

def decode_uri(uri)
  @user, @pass, @host, @port, @vhost = uri.match(/([^:]+):([^@]+)@([^:]+):([^\/]+)\/(.*)/).captures
end

#emit(metadata, payload) ⇒ Object



148
149
150
151
152
153
# File 'lib/qswarm/connections/amqp.rb', line 148

def emit(, payload)
  Qswarm.logger.info "[#{@agent.name.inspect}] :amqp connection #{@name.inspect} bound to #{.routing_key}, received #{payload.inspect}"

  @agent.emit(@name, :payload => OpenStruct.new(:raw => payload, :headers => (.headers.nil? ? {} : Hash[.headers.map{ |k, v| [k.to_sym, v] }]).merge(:routing_key => .routing_key), :format => @format))
  .ack if ack?
end

#exchange(channel = nil) ⇒ Object



57
58
59
60
61
62
63
64
65
66
# File 'lib/qswarm/connections/amqp.rb', line 57

def exchange(channel = nil)
  @exchange ||= begin
    @exchange = AMQP::Exchange.new(channel ||= AMQP::Channel.new(connection, :auto_recovery => true), @exchange_type, @exchange_name, @exchange_args) do |exchange|
      Qswarm.logger.debug "Declared #{@exchange_type} exchange #{@vhost}/#{@exchange_name}"
      exchange.on_return do |basic_return, , payload|
        Qswarm.logger.error "#{payload} was returned! reply_code = #{basic_return.reply_code}, reply_text = #{basic_return.reply_text}"
      end
    end
  end
end

#queue(name, routing_key = '', args = nil) ⇒ Object



50
51
52
53
54
55
# File 'lib/qswarm/connections/amqp.rb', line 50

def queue(name, routing_key = '', args = nil)
  @queues["#{name}/#{routing_key}"] ||= begin
    Qswarm.logger.debug "Binding queue #{name}/#{routing_key}"
    @queues["#{name}/#{routing_key}"] = channel(name, routing_key).queue(name, args).bind(exchange(channel(name, routing_key)), @bind_args.merge(:routing_key => routing_key))
  end
end

#runObject



136
137
138
139
140
141
142
143
144
145
146
# File 'lib/qswarm/connections/amqp.rb', line 136

def run
  if !@bind.nil?
    [*@bind].each do |bind|
      queue(@agent.name.to_s + '.' +  @name.to_s + @uuid ||= '', bind, @queue_args).subscribe(@subscribe_args) do |, payload|
        emit , payload
      end
    end

    dsl_call(&@on_connect) if @on_connect
  end
end

#sink(args, payload) ⇒ Object



155
156
157
158
159
160
161
162
163
164
# File 'lib/qswarm/connections/amqp.rb', line 155

def sink(args, payload)
  [*args[:routing_key]].each do |routing_key|
    Qswarm.logger.info "[#{@agent.name.inspect} #{@name.inspect}] Sinking #{payload.raw.inspect} to AMQP routing_key #{routing_key.inspect}"
    if args[:headers] || payload.headers
      exchange.publish payload.raw, :routing_key => routing_key, :headers => (args[:headers] ? args[:headers] : payload.headers).merge(:routing_key => routing_key)
    else
      exchange.publish payload.raw, :routing_key => routing_key
    end
  end
end

#statusObject



132
133
134
# File 'lib/qswarm/connections/amqp.rb', line 132

def status
  "AMQP connection #{@name.inspect} at #{@args[:uri]}, bound to #{@args[:bind]}/#{@args[:bind_args]} on #{@args[:exchange_type].inspect} exchange #{@args[:exchange_name]}"
end

#to_sObject



128
129
130
# File 'lib/qswarm/connections/amqp.rb', line 128

def to_s
  "amqp://#{@user}:#{@pass}@#{@host}:#{@port}/#{CGI.escape('/' + @vhost)}"
end