Class: GerritEventRouter::Broker::AMQP

Inherits:
Base
  • Object
show all
Defined in:
lib/ger/broker/amqp.rb

Defined Under Namespace

Classes: Config

Constant Summary collapse

HEADER =
'[Broker::AMQP]'

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Base

#header

Constructor Details

#initialize(broker) ⇒ AMQP

Returns a new instance of AMQP.



20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/ger/broker/amqp.rb', line 20

def initialize(broker)
  super(broker)
  @headers = {
    :content_type => 'application/json',
    :app_id => GER::NAME.downcase,
    :persistent => true
  }
  uri = URI.parse(@broker.uri)
  @headers[:user_id] = uri.user if uri.user
  @connection = nil
  @exchange = nil
end

Instance Attribute Details

#connectionObject (readonly)

Returns the value of attribute connection.



107
108
109
# File 'lib/ger/broker/amqp.rb', line 107

def connection
  @connection
end

#exchangeObject (readonly)

Returns the value of attribute exchange.



107
108
109
# File 'lib/ger/broker/amqp.rb', line 107

def exchange
  @exchange
end

Instance Method Details

#auth_failure(err) ⇒ Object



88
89
90
91
# File 'lib/ger/broker/amqp.rb', line 88

def auth_failure(err)
  GER.logger.error "#{HEADER} Authentication failed, as expcted, caught #{afe.inspect}"
  EM.stop if EM.reactor_running?
end

#channelObject



60
61
62
63
64
65
66
# File 'lib/ger/broker/amqp.rb', line 60

def channel
  if @exchange then
    @exchange.channel
  else
    nil
  end
end

#conn_failure(err) ⇒ Object



83
84
85
86
# File 'lib/ger/broker/amqp.rb', line 83

def conn_failure(err)
  GER.logger.error "#{HEADER} TCP connection failed, as expcted."
  EM.stop if EM.reactor_running?
end

#conn_intp(connection) ⇒ Object



100
101
102
103
104
105
# File 'lib/ger/broker/amqp.rb', line 100

def conn_intp(connection)
  if connection.error? then
    GER.logger.warn "#{HEADER} Connection interrupton. reconnectiong..."
    connection.reconnect(false, 1)
  end
end

#conn_loss(connection, settings) ⇒ Object



93
94
95
96
97
98
# File 'lib/ger/broker/amqp.rb', line 93

def conn_loss(connection, settings)
  if connection.error? then
    GER.logger.warn "#{HEADER} Connection lost. reconnectiong..."
    connection.reconnect(false, 1)
  end
end

#connect(&block) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/ger/broker/amqp.rb', line 33

def connect(&block)
  begin
    @connection = ::AMQP.connect(@broker.uri) do |connection|
      generate_channel(connection)
      block.call self if block
    end

    @connection.on_tcp_connection_loss(&method(:conn_loss))
    @connection.on_connection_interruption(&method(:conn_intp))
    @connection.on_recovery(&method(:generate_channel))

    self
  rescue ::AMQP::PossibleAuthenticationFailureError => afe
    auth_failure(afe)
  rescue ::AMQP::TCPConnectionFailed => e
    conn_failure(e)
  end
end

#generate_channel(connection = nil) ⇒ Object



68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/ger/broker/amqp.rb', line 68

def generate_channel(connection = nil)
  conn = connection || @connection
  channel = ::AMQP::Channel.new(conn)
  channel.auto_recovery = true
  channel.on_error do |ch, channel_close|
    raise channel_close.reply_text
  end

  @exchange = ::AMQP::Exchange.new(channel,
                                   @broker.exchange['type'].to_sym,
                                   @broker.exchange['name'],
                                   :durable => true,
                                   :auto_delete => false)
end

#send(data, param) ⇒ Object



52
53
54
55
56
57
58
# File 'lib/ger/broker/amqp.rb', line 52

def send(data, param)
  param[:timestamp] = Time.now.to_i
  @exchange.publish(data, @headers.merge(param)) do
    GER.logger.info "#{HEADER} Published time: #{param[:timestamp]}, routing: #{param[:routing_key]||'All'}"
    GER.logger.debug "#{HEADER} Published content: #{data}"
  end
end