Class: LogjamAgent::AMQPForwarder

Inherits:
Object
  • Object
show all
Defined in:
lib/logjam_agent/amqp_forwarder.rb

Constant Summary collapse

RETRY_AFTER =
10.seconds

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(app, env, opts = {}) ⇒ AMQPForwarder

Returns a new instance of AMQPForwarder.



8
9
10
11
12
13
# File 'lib/logjam_agent/amqp_forwarder.rb', line 8

def initialize(app, env, opts = {})
  @app = app
  @env = env
  @config = default_options(app, env).merge!(opts)
  @exchange = @bunny = nil
end

Instance Attribute Details

#appObject (readonly)

Returns the value of attribute app.



6
7
8
# File 'lib/logjam_agent/amqp_forwarder.rb', line 6

def app
  @app
end

#envObject (readonly)

Returns the value of attribute env.



6
7
8
# File 'lib/logjam_agent/amqp_forwarder.rb', line 6

def env
  @env
end

Instance Method Details

#default_options(app, env) ⇒ Object



15
16
17
18
19
20
21
22
23
# File 'lib/logjam_agent/amqp_forwarder.rb', line 15

def default_options(app, env)
  {
    :host                 => "localhost",
    :exchange             => "request-stream-#{app}-#{env}",
    :exchange_durable     => true,
    :exchange_auto_delete => false,
    :routing_key          => "logs.#{app}.#{env}"
  }
end

#forward(msg, engine) ⇒ Object

TODO: mutex!



26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/logjam_agent/amqp_forwarder.rb', line 26

def forward(msg, engine)
  return if paused? || LogjamAgent.disabled
  begin
    # $stderr.puts msg
    key = @config[:routing_key]
    key += ".#{engine}" if engine
    exchange.publish(msg, :key => key, :persistent => false)
  rescue Exception => exception
    reraise_expectation_errors!
    pause(exception)
  end
end

#reset(exception = nil) ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/logjam_agent/amqp_forwarder.rb', line 39

def reset(exception=nil)
  return unless @bunny
  begin
    if exception
      @bunny.__send__(:close_socket)
    else
      @bunny.stop
    end
  rescue Exception
    # if bunny throws an exception here, its not usable anymore anyway
  ensure
    @exchange = @bunny = nil
  end
end