Class: LogStash::Inputs::Relp

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/inputs/relp.rb

Overview

Read RELP events over a TCP socket.

For more information about RELP, see <www.rsyslog.com/doc/imrelp.html>

This protocol implements application-level acknowledgements to help protect against message loss.

Message acks only function as far as messages being put into the queue for filters; anything lost after that point will not be retransmitted

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ Relp

Returns a new instance of Relp.



48
49
50
51
52
53
54
55
# File 'lib/logstash/inputs/relp.rb', line 48

def initialize(*args)
  super(*args)
  @relp_server = nil

  # monkey patch TCPSocket and SSLSocket to include socket peer
  TCPSocket.module_eval{include ::LogStash::Util::SocketPeer}
  OpenSSL::SSL::SSLSocket.module_eval{include ::LogStash::Util::SocketPeer}
end

Instance Method Details

#closeObject



152
153
154
155
156
157
158
159
160
# File 'lib/logstash/inputs/relp.rb', line 152

def close
  # see related comment in register: we must make sure to close the @relp_server here
  # because it is created in the register method and we could be in the context of having
  # register called but never run & stop, only close.
  if @relp_server
    @relp_server.shutdown rescue nil
    @relp_server = nil
  end
end

#registerObject



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/logstash/inputs/relp.rb', line 58

def register
  @logger.info("Starting relp input listener", :address => "#{@host}:#{@port}")

  if @ssl_enable
    initialize_ssl_context
    if @ssl_verify == false
      @logger.warn [
        "** WARNING ** Detected UNSAFE options in relp input configuration!",
        "** WARNING ** You have enabled encryption but DISABLED certificate verification.",
        "** WARNING ** To make sure your data is secure change :ssl_verify to true"
      ].join("\n")
    end
  end

  # note that since we are opening a socket (through @relp_server) in register,
  # we must also make sure we close it in the close method even if we also close
  # it in the stop method since we could have a situation where register is called
  # but not run & stop.

  @relp_server = RelpServer.new(@host, @port,['syslog'], @ssl_context)
end

#run(output_queue) ⇒ Object



120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/logstash/inputs/relp.rb', line 120

def run(output_queue)
  while !stop?
    begin
      server, socket = @relp_server.accept
      connection_thread(server, socket, output_queue)
    rescue Relp::InvalidCommand,Relp::InappropriateCommand => e
      @logger.warn('Relp client trying to open connection with something other than open:'+e.message)
    rescue Relp::InsufficientCommands
      @logger.warn('Relp client incapable of syslog')
    rescue Relp::ConnectionClosed
      @logger.debug('Relp Connection closed')
    rescue OpenSSL::SSL::SSLError => ssle
      # NOTE(mrichar1): This doesn't return a useful error message for some reason
      @logger.error("SSL Error", :exception => ssle, :backtrace => ssle.backtrace)
    rescue => e
      # if this exception occured while the plugin is stopping
      # just ignore and exit
      raise e unless stop?
    end
  end
end

#stopObject

def run



142
143
144
145
146
147
148
149
150
# File 'lib/logstash/inputs/relp.rb', line 142

def stop
  # force close @relp_server which will escape any blocking read with a IO exception
  # and any thread using them will exit.
  # catch all rescue nil to discard any close errors or invalid socket
  if @relp_server
    @relp_server.shutdown rescue nil
    @relp_server = nil
  end
end