Class: GerritEventRouter::Broker::AMQP
- Defined in:
- lib/ger/broker/amqp.rb
Defined Under Namespace
Classes: Config
Constant Summary collapse
- HEADER =
'[Broker::AMQP]'
Instance Attribute Summary collapse
-
#connection ⇒ Object
readonly
Returns the value of attribute connection.
-
#exchange ⇒ Object
readonly
Returns the value of attribute exchange.
Instance Method Summary collapse
- #auth_failure(err) ⇒ Object
- #channel ⇒ Object
- #conn_failure(err) ⇒ Object
- #conn_intp(connection) ⇒ Object
- #conn_loss(connection, settings) ⇒ Object
- #connect(&block) ⇒ Object
- #generate_channel(connection = nil) ⇒ Object
-
#initialize(broker) ⇒ AMQP
constructor
A new instance of AMQP.
- #send(data, param) ⇒ Object
Methods inherited from Base
Constructor Details
#initialize(broker) ⇒ AMQP
Returns a new instance of AMQP.
20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/ger/broker/amqp.rb', line 20 def initialize(broker) super(broker) @headers = { :content_type => 'application/json', :app_id => GER::NAME.downcase, :persistent => true } uri = URI.parse(@broker.uri) @headers[:user_id] = uri.user if uri.user @connection = nil @exchange = nil end |
Instance Attribute Details
#connection ⇒ Object (readonly)
Returns the value of attribute connection.
107 108 109 |
# File 'lib/ger/broker/amqp.rb', line 107 def connection @connection end |
#exchange ⇒ Object (readonly)
Returns the value of attribute exchange.
107 108 109 |
# File 'lib/ger/broker/amqp.rb', line 107 def exchange @exchange end |
Instance Method Details
#auth_failure(err) ⇒ Object
88 89 90 91 |
# File 'lib/ger/broker/amqp.rb', line 88 def auth_failure(err) GER.logger.error "#{HEADER} Authentication failed, as expcted, caught #{afe.inspect}" EM.stop if EM.reactor_running? end |
#channel ⇒ Object
60 61 62 63 64 65 66 |
# File 'lib/ger/broker/amqp.rb', line 60 def channel if @exchange then @exchange.channel else nil end end |
#conn_failure(err) ⇒ Object
83 84 85 86 |
# File 'lib/ger/broker/amqp.rb', line 83 def conn_failure(err) GER.logger.error "#{HEADER} TCP connection failed, as expcted." EM.stop if EM.reactor_running? end |
#conn_intp(connection) ⇒ Object
100 101 102 103 104 105 |
# File 'lib/ger/broker/amqp.rb', line 100 def conn_intp(connection) if connection.error? then GER.logger.warn "#{HEADER} Connection interrupton. reconnectiong..." connection.reconnect(false, 1) end end |
#conn_loss(connection, settings) ⇒ Object
93 94 95 96 97 98 |
# File 'lib/ger/broker/amqp.rb', line 93 def conn_loss(connection, settings) if connection.error? then GER.logger.warn "#{HEADER} Connection lost. reconnectiong..." connection.reconnect(false, 1) end end |
#connect(&block) ⇒ Object
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/ger/broker/amqp.rb', line 33 def connect(&block) begin @connection = ::AMQP.connect(@broker.uri) do |connection| generate_channel(connection) block.call self if block end @connection.on_tcp_connection_loss(&method(:conn_loss)) @connection.on_connection_interruption(&method(:conn_intp)) @connection.on_recovery(&method(:generate_channel)) self rescue ::AMQP::PossibleAuthenticationFailureError => afe auth_failure(afe) rescue ::AMQP::TCPConnectionFailed => e conn_failure(e) end end |
#generate_channel(connection = nil) ⇒ Object
68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/ger/broker/amqp.rb', line 68 def generate_channel(connection = nil) conn = connection || @connection channel = ::AMQP::Channel.new(conn) channel.auto_recovery = true channel.on_error do |ch, channel_close| raise channel_close.reply_text end @exchange = ::AMQP::Exchange.new(channel, @broker.exchange['type'].to_sym, @broker.exchange['name'], :durable => true, :auto_delete => false) end |
#send(data, param) ⇒ Object
52 53 54 55 56 57 58 |
# File 'lib/ger/broker/amqp.rb', line 52 def send(data, param) param[:timestamp] = Time.now.to_i @exchange.publish(data, @headers.merge(param)) do GER.logger.info "#{HEADER} Published time: #{param[:timestamp]}, routing: #{param[:routing_key]||'All'}" GER.logger.debug "#{HEADER} Published content: #{data}" end end |