Module: LogStash::Inputs::RabbitMQ::MarchHareImpl

Included in:
LogStash::Inputs::RabbitMQ
Defined in:
lib/logstash/inputs/rabbitmq/march_hare.rb

Overview

MarchHare-based implementation for JRuby

Instance Method Summary collapse

Instance Method Details

#registerObject



5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/logstash/inputs/rabbitmq/march_hare.rb', line 5

def register
  require "hot_bunnies"
  require "java"

  @vhost       ||= "127.0.0.1"
  # 5672. Will be switched to 5671 by Bunny if TLS is enabled.
  @port        ||= 5672
  @key         ||= "#"

  @settings = {
    :vhost => @vhost,
    :host  => @host,
    :port  => @port,
    :user  => @user,
    :automatic_recovery => false
  }
  @settings[:pass]      = @password.value if @password
  @settings[:tls]       = @ssl if @ssl

  proto                 = if @ssl
                            "amqp"
                          else
                            "amqps"
                          end
  @connection_url       = "#{proto}://#{@user}@#{@host}:#{@port}#{vhost}/#{@queue}"

  @logger.info("Registering input #{@connection_url}")
end

#run(output_queue) ⇒ Object



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/logstash/inputs/rabbitmq/march_hare.rb', line 34

def run(output_queue)
  @output_queue          = output_queue
  @break_out_of_the_loop = java.util.concurrent.atomic.AtomicBoolean.new(false)

  # MarchHare does not raise exceptions when connection goes down with a blocking
  # consumer running (it uses callbacks, as the RabbitMQ Java client does).
  #
  # However, MarchHare::Channel will make sure to unblock all blocking consumers
  # on any internal shutdown, so #consume will return and another loop iteration
  # will run.
  #
  # This is very similar to how the Bunny implementation works and is sufficient
  # for our needs: it recovers successfully after RabbitMQ is kill -9ed, the
  # network device is shut down, etc. MK.
  until @break_out_of_the_loop.get do
    begin
      setup
      consume
    rescue MarchHare::Exception, java.lang.Throwable, com.rabbitmq.client.AlreadyClosedException => e
      n = 10
      @logger.error("RabbitMQ connection error: #{e}. Will reconnect in #{n} seconds...")

      sleep n
      retry
    rescue LogStash::ShutdownSignal => ss
      shutdown_consumer
    end

    n = 10
    @logger.error("RabbitMQ connection error: #{e}. Will reconnect in #{n} seconds...")
  end
end

#teardownObject



67
68
69
70
71
72
73
74
75
# File 'lib/logstash/inputs/rabbitmq/march_hare.rb', line 67

def teardown
  shutdown_consumer
  @q.delete unless @durable

  @ch.close         if @ch && @ch.open?
  @connection.close if @connection && @connection.open?

  finished
end