Class: LogStash::Inputs::Relp
- Defined in:
- lib/logstash/inputs/relp.rb
Overview
Read RELP events over a TCP socket.
For more information about RELP, see <www.rsyslog.com/doc/imrelp.html>
This protocol implements application-level acknowledgements to help protect against message loss.
Message acks only function as far as messages being put into the queue for filters; anything lost after that point will not be retransmitted
Defined Under Namespace
Classes: Interrupted
Constant Summary
Constants included from Config::Mixin
Instance Attribute Summary
Attributes inherited from Base
Attributes included from Config::Mixin
Attributes inherited from Plugin
Instance Method Summary collapse
-
#initialize(*args) ⇒ Relp
constructor
A new instance of Relp.
- #register ⇒ Object
- #run(output_queue) ⇒ Object
-
#teardown ⇒ Object
def run.
Methods inherited from Base
Methods included from Config::Mixin
Methods inherited from Plugin
#eql?, #finished, #finished?, #hash, #inspect, lookup, #reload, #running?, #shutdown, #terminating?, #to_s
Constructor Details
#initialize(*args) ⇒ Relp
Returns a new instance of Relp.
32 33 34 |
# File 'lib/logstash/inputs/relp.rb', line 32 def initialize(*args) super(*args) end |
Instance Method Details
#register ⇒ Object
37 38 39 40 |
# File 'lib/logstash/inputs/relp.rb', line 37 def register @logger.info("Starting relp input listener", :address => "#{@host}:#{@port}") @relp_server = RelpServer.new(@host, @port,['syslog']) end |
#run(output_queue) ⇒ Object
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/logstash/inputs/relp.rb', line 60 def run(output_queue) @thread = Thread.current loop do begin # Start a new thread for each connection. Thread.start(@relp_server.accept) do |client| rs = client[0] socket = client[1] # monkeypatch a 'peer' method onto the socket. socket.instance_eval { class << self; include ::LogStash::Util::SocketPeer end } peer = socket.peer @logger.debug("Relp Connection to #{peer} created") begin relp_stream(rs,socket, output_queue, peer) rescue Relp::ConnectionClosed => e @logger.debug("Relp Connection to #{peer} Closed") rescue Relp::RelpError => e @logger.warn('Relp error: '+e.class.to_s+' '+e.) #TODO: Still not happy with this, are they all warn level? #Will this catch everything I want it to? #Relp spec says to close connection on error, ensure this is the case end end # Thread.start rescue Relp::InvalidCommand,Relp::InappropriateCommand => e @logger.warn('Relp client trying to open connection with something other than open:'+e.) rescue Relp::InsufficientCommands @logger.warn('Relp client incapable of syslog') rescue IOError, Interrupted if @interrupted # Intended shutdown, get out of the loop @relp_server.shutdown break else # Else it was a genuine IOError caused by something else, so propagate it up.. raise end end end # loop end |
#teardown ⇒ Object
def run
100 101 102 103 |
# File 'lib/logstash/inputs/relp.rb', line 100 def teardown @interrupted = true @thread.raise(Interrupted.new) end |