Module: LogStash::Inputs::RabbitMQ::BunnyImpl
- Included in:
- LogStash::Inputs::RabbitMQ
- Defined in:
- lib/logstash/inputs/rabbitmq/bunny.rb
Instance Method Summary collapse
Instance Method Details
#consume ⇒ Object
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/logstash/inputs/rabbitmq/bunny.rb', line 101 def consume @logger.info("Will consume events from queue #{@q.name}") # we both need to block the caller in Bunny::Queue#subscribe and have # a reference to the consumer so that we can cancel it, so # a consumer manually. MK. @consumer = Bunny::Consumer.new(@ch, @q) @q.subscribe(:manual_ack => @ack, :block => true) do |delivery_info, properties, data| @codec.decode(data) do |event| decorate(event) @output_queue << event end @ch.acknowledge(delivery_info.delivery_tag) if @ack end end |
#register ⇒ Object
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/logstash/inputs/rabbitmq/bunny.rb', line 4 def register require "bunny" @vhost ||= Bunny::DEFAULT_HOST # 5672. Will be switched to 5671 by Bunny if TLS is enabled. @port ||= AMQ::Protocol::DEFAULT_PORT @routing_key ||= "#" @settings = { :vhost => @vhost, :host => @host, :port => @port, :automatically_recover => false } @settings[:user] = @user || Bunny::DEFAULT_USER @settings[:pass] = if @password @password.value else Bunny::DEFAULT_PASSWORD end @settings[:log_level] = if @debug :debug else :error end @settings[:tls] = @ssl if @ssl @settings[:verify_ssl] = @verify_ssl if @verify_ssl proto = if @ssl "amqp" else "amqps" end @connection_url = "#{proto}://#{@user}@#{@host}:#{@port}#{vhost}/#{@queue}" @logger.info("Registering input #{@connection_url}") end |
#run(output_queue) ⇒ Object
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/logstash/inputs/rabbitmq/bunny.rb', line 44 def run(output_queue) @output_queue = output_queue begin setup consume rescue Bunny::NetworkFailure, Bunny::ConnectionClosedError, Bunny::ConnectionLevelException, Bunny::TCPConnectionFailed => e n = Bunny::Session::DEFAULT_NETWORK_RECOVERY_INTERVAL * 2 # Because we manually reconnect instead of letting Bunny # handle failures, # make sure we don't leave any consumer work pool # threads behind. MK. @ch.maybe_kill_consumer_work_pool! @logger.error("RabbitMQ connection error: #{e.}. Will attempt to reconnect in #{n} seconds...") sleep n retry end end |
#setup ⇒ Object
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/logstash/inputs/rabbitmq/bunny.rb', line 75 def setup @conn = Bunny.new(@settings) @logger.debug("Connecting to RabbitMQ. Settings: #{@settings.inspect}, queue: #{@queue.inspect}") return if terminating? @conn.start @ch = @conn.create_channel.tap do |ch| ch.prefetch(@prefetch_count) end @logger.info("Connected to RabbitMQ at #{@settings[:host]}") @arguments_hash = Hash[*@arguments] @q = @ch.queue(@queue, :durable => @durable, :auto_delete => @auto_delete, :exclusive => @exclusive, :arguments => @arguments) # exchange binding is optional for the input if @exchange @q.bind(@exchange, :routing_key => @key) end end |
#teardown ⇒ Object
65 66 67 68 69 70 71 72 73 |
# File 'lib/logstash/inputs/rabbitmq/bunny.rb', line 65 def teardown @consumer.cancel @q.delete unless @durable @ch.close if @ch && @ch.open? @conn.close if @conn && @conn.open? finished end |