Module: LogStash::Inputs::RabbitMQ::BunnyImpl

Included in:
LogStash::Inputs::RabbitMQ
Defined in:
lib/logstash/inputs/rabbitmq/bunny.rb

Instance Method Summary collapse

Instance Method Details

#consumeObject



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/logstash/inputs/rabbitmq/bunny.rb', line 101

def consume
  @logger.info("Will consume events from queue #{@q.name}")

  # we both need to block the caller in Bunny::Queue#subscribe and have
  # a reference to the consumer so that we can cancel it, so
  # a consumer manually. MK.
  @consumer = Bunny::Consumer.new(@ch, @q)
  @q.subscribe(:manual_ack => @ack, :block => true) do |delivery_info, properties, data|
    @codec.decode(data) do |event|
      decorate(event)
      @output_queue << event
    end

    @ch.acknowledge(delivery_info.delivery_tag) if @ack
  end
end

#registerObject



4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/logstash/inputs/rabbitmq/bunny.rb', line 4

def register
  require "bunny"

  @vhost       ||= Bunny::DEFAULT_HOST
  # 5672. Will be switched to 5671 by Bunny if TLS is enabled.
  @port        ||= AMQ::Protocol::DEFAULT_PORT
  @routing_key ||= "#"

  @settings = {
    :vhost => @vhost,
    :host  => @host,
    :port  => @port,
    :automatically_recover => false
  }
  @settings[:user]      = @user || Bunny::DEFAULT_USER
  @settings[:pass]      = if @password
                            @password.value
                          else
                            Bunny::DEFAULT_PASSWORD
                          end

  @settings[:log_level] = if @debug
                            :debug
                          else
                            :error
                          end

  @settings[:tls]        = @ssl if @ssl
  @settings[:verify_ssl] = @verify_ssl if @verify_ssl

  proto                  = if @ssl
                             "amqp"
                           else
                             "amqps"
                           end
  @connection_url        = "#{proto}://#{@user}@#{@host}:#{@port}#{vhost}/#{@queue}"

  @logger.info("Registering input #{@connection_url}")
end

#run(output_queue) ⇒ Object



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/logstash/inputs/rabbitmq/bunny.rb', line 44

def run(output_queue)
  @output_queue = output_queue

  begin
    setup
    consume
  rescue Bunny::NetworkFailure, Bunny::ConnectionClosedError, Bunny::ConnectionLevelException, Bunny::TCPConnectionFailed => e
    n = Bunny::Session::DEFAULT_NETWORK_RECOVERY_INTERVAL * 2

    # Because we manually reconnect instead of letting Bunny
    # handle failures,
    # make sure we don't leave any consumer work pool
    # threads behind. MK.
    @ch.maybe_kill_consumer_work_pool!
    @logger.error("RabbitMQ connection error: #{e.message}. Will attempt to reconnect in #{n} seconds...")

    sleep n
    retry
  end
end

#setupObject



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/logstash/inputs/rabbitmq/bunny.rb', line 75

def setup
  @conn = Bunny.new(@settings)

  @logger.debug("Connecting to RabbitMQ. Settings: #{@settings.inspect}, queue: #{@queue.inspect}")
  return if terminating?
  @conn.start

  @ch = @conn.create_channel.tap do |ch|
    ch.prefetch(@prefetch_count)
  end
  @logger.info("Connected to RabbitMQ at #{@settings[:host]}")

  @arguments_hash = Hash[*@arguments]

  @q = @ch.queue(@queue,
                 :durable     => @durable,
                 :auto_delete => @auto_delete,
                 :exclusive   => @exclusive,
                 :arguments   => @arguments)

  # exchange binding is optional for the input
  if @exchange
    @q.bind(@exchange, :routing_key => @key)
  end
end

#teardownObject



65
66
67
68
69
70
71
72
73
# File 'lib/logstash/inputs/rabbitmq/bunny.rb', line 65

def teardown
  @consumer.cancel
  @q.delete unless @durable

  @ch.close   if @ch && @ch.open?
  @conn.close if @conn && @conn.open?

  finished
end