Class: LogStash::Inputs::Relp

Inherits:
Base show all
Defined in:
lib/logstash/inputs/relp.rb

Overview

Read RELP events over a TCP socket.

For more information about RELP, see <www.rsyslog.com/doc/imrelp.html>

This protocol implements application-level acknowledgements to help protect against message loss.

Message acks only function as far as messages being put into the queue for filters; anything lost after that point will not be retransmitted

Defined Under Namespace

Classes: Interrupted

Constant Summary

Constants included from Config::Mixin

Config::Mixin::CONFIGSORT

Instance Attribute Summary

Attributes inherited from Base

#params, #threadable

Attributes included from Config::Mixin

#config, #original_params

Attributes inherited from Plugin

#logger, #params

Instance Method Summary collapse

Methods inherited from Base

#tag

Methods included from Config::Mixin

#config_init, included

Methods inherited from Plugin

#eql?, #finished, #finished?, #hash, #inspect, lookup, #reload, #running?, #shutdown, #terminating?, #to_s

Constructor Details

#initialize(*args) ⇒ Relp

Returns a new instance of Relp.



32
33
34
# File 'lib/logstash/inputs/relp.rb', line 32

def initialize(*args)
  super(*args)
end

Instance Method Details

#registerObject



37
38
39
40
# File 'lib/logstash/inputs/relp.rb', line 37

def register
  @logger.info("Starting relp input listener", :address => "#{@host}:#{@port}")
  @relp_server = RelpServer.new(@host, @port,['syslog'])
end

#run(output_queue) ⇒ Object



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/logstash/inputs/relp.rb', line 60

def run(output_queue)
  @thread = Thread.current
  loop do
    begin
      # Start a new thread for each connection.
      Thread.start(@relp_server.accept) do |client|
          rs = client[0]
          socket = client[1]
          # monkeypatch a 'peer' method onto the socket.
          socket.instance_eval { class << self; include ::LogStash::Util::SocketPeer end }
          peer = socket.peer
          @logger.debug("Relp Connection to #{peer} created")
        begin
          relp_stream(rs,socket, output_queue, peer)
        rescue Relp::ConnectionClosed => e
          @logger.debug("Relp Connection to #{peer} Closed")
        rescue Relp::RelpError => e
          @logger.warn('Relp error: '+e.class.to_s+' '+e.message)
          #TODO: Still not happy with this, are they all warn level?
          #Will this catch everything I want it to?
          #Relp spec says to close connection on error, ensure this is the case
        end
      end # Thread.start
    rescue Relp::InvalidCommand,Relp::InappropriateCommand => e
      @logger.warn('Relp client trying to open connection with something other than open:'+e.message)
    rescue Relp::InsufficientCommands
      @logger.warn('Relp client incapable of syslog')
    rescue IOError, Interrupted
      if @interrupted
        # Intended shutdown, get out of the loop
        @relp_server.shutdown
        break
      else
        # Else it was a genuine IOError caused by something else, so propagate it up..
        raise
      end
    end
  end # loop
end

#teardownObject

def run



100
101
102
103
# File 'lib/logstash/inputs/relp.rb', line 100

def teardown
  @interrupted = true
  @thread.raise(Interrupted.new)
end