Module: LogStash::Inputs::RabbitMQ::MarchHareImpl
- Included in:
- LogStash::Inputs::RabbitMQ
- Defined in:
- lib/logstash/inputs/rabbitmq/march_hare.rb
Overview
MarchHare-based implementation for JRuby
Instance Method Summary collapse
Instance Method Details
#register ⇒ Object
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/logstash/inputs/rabbitmq/march_hare.rb', line 5 def register require "hot_bunnies" require "java" @vhost ||= "127.0.0.1" # 5672. Will be switched to 5671 by Bunny if TLS is enabled. @port ||= 5672 @key ||= "#" @settings = { :vhost => @vhost, :host => @host, :port => @port, :user => @user, :automatic_recovery => false } @settings[:pass] = @password.value if @password @settings[:tls] = @ssl if @ssl proto = if @ssl "amqp" else "amqps" end @connection_url = "#{proto}://#{@user}@#{@host}:#{@port}#{vhost}/#{@queue}" @logger.info("Registering input #{@connection_url}") end |
#run(output_queue) ⇒ Object
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/logstash/inputs/rabbitmq/march_hare.rb', line 34 def run(output_queue) @output_queue = output_queue @break_out_of_the_loop = java.util.concurrent.atomic.AtomicBoolean.new(false) # MarchHare does not raise exceptions when connection goes down with a blocking # consumer running (it uses callbacks, as the RabbitMQ Java client does). # # However, MarchHare::Channel will make sure to unblock all blocking consumers # on any internal shutdown, so #consume will return and another loop iteration # will run. # # This is very similar to how the Bunny implementation works and is sufficient # for our needs: it recovers successfully after RabbitMQ is kill -9ed, the # network device is shut down, etc. MK. until @break_out_of_the_loop.get do begin setup consume rescue MarchHare::Exception, java.lang.Throwable, com.rabbitmq.client.AlreadyClosedException => e n = 10 @logger.error("RabbitMQ connection error: #{e}. Will reconnect in #{n} seconds...") sleep n retry rescue LogStash::ShutdownSignal => ss shutdown_consumer end n = 10 @logger.error("RabbitMQ connection error: #{e}. Will reconnect in #{n} seconds...") end end |
#teardown ⇒ Object
67 68 69 70 71 72 73 74 75 |
# File 'lib/logstash/inputs/rabbitmq/march_hare.rb', line 67 def teardown shutdown_consumer @q.delete unless @durable @ch.close if @ch && @ch.open? @connection.close if @connection && @connection.open? finished end |