Class: LogStash::Inputs::Relp
- Inherits:
-
Base
- Object
- Base
- LogStash::Inputs::Relp
- Defined in:
- lib/logstash/inputs/relp.rb
Overview
Read RELP events over a TCP socket.
For more information about RELP, see <www.rsyslog.com/doc/imrelp.html>
This protocol implements application-level acknowledgements to help protect against message loss.
Message acks only function as far as messages being put into the queue for filters; anything lost after that point will not be retransmitted
Instance Method Summary collapse
- #close ⇒ Object
-
#initialize(*args) ⇒ Relp
constructor
A new instance of Relp.
- #register ⇒ Object
- #run(output_queue) ⇒ Object
-
#stop ⇒ Object
def run.
Constructor Details
#initialize(*args) ⇒ Relp
Returns a new instance of Relp.
48 49 50 51 52 53 54 55 |
# File 'lib/logstash/inputs/relp.rb', line 48 def initialize(*args) super(*args) @relp_server = nil # monkey patch TCPSocket and SSLSocket to include socket peer TCPSocket.module_eval{include ::LogStash::Util::SocketPeer} OpenSSL::SSL::SSLSocket.module_eval{include ::LogStash::Util::SocketPeer} end |
Instance Method Details
#close ⇒ Object
152 153 154 155 156 157 158 159 160 |
# File 'lib/logstash/inputs/relp.rb', line 152 def close # see related comment in register: we must make sure to close the @relp_server here # because it is created in the register method and we could be in the context of having # register called but never run & stop, only close. if @relp_server @relp_server.shutdown rescue nil @relp_server = nil end end |
#register ⇒ Object
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/logstash/inputs/relp.rb', line 58 def register @logger.info("Starting relp input listener", :address => "#{@host}:#{@port}") if @ssl_enable initialize_ssl_context if @ssl_verify == false @logger.warn [ "** WARNING ** Detected UNSAFE options in relp input configuration!", "** WARNING ** You have enabled encryption but DISABLED certificate verification.", "** WARNING ** To make sure your data is secure change :ssl_verify to true" ].join("\n") end end # note that since we are opening a socket (through @relp_server) in register, # we must also make sure we close it in the close method even if we also close # it in the stop method since we could have a situation where register is called # but not run & stop. @relp_server = RelpServer.new(@host, @port,['syslog'], @ssl_context) end |
#run(output_queue) ⇒ Object
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
# File 'lib/logstash/inputs/relp.rb', line 120 def run(output_queue) while !stop? begin server, socket = @relp_server.accept connection_thread(server, socket, output_queue) rescue Relp::InvalidCommand,Relp::InappropriateCommand => e @logger.warn('Relp client trying to open connection with something other than open:'+e.) rescue Relp::InsufficientCommands @logger.warn('Relp client incapable of syslog') rescue Relp::ConnectionClosed @logger.debug('Relp Connection closed') rescue OpenSSL::SSL::SSLError => ssle # NOTE(mrichar1): This doesn't return a useful error message for some reason @logger.error("SSL Error", :exception => ssle, :backtrace => ssle.backtrace) rescue => e # if this exception occured while the plugin is stopping # just ignore and exit raise e unless stop? end end end |
#stop ⇒ Object
def run
142 143 144 145 146 147 148 149 150 |
# File 'lib/logstash/inputs/relp.rb', line 142 def stop # force close @relp_server which will escape any blocking read with a IO exception # and any thread using them will exit. # catch all rescue nil to discard any close errors or invalid socket if @relp_server @relp_server.shutdown rescue nil @relp_server = nil end end |