Class: LogStash::Inputs::Unix
- Defined in:
- lib/logstash/inputs/unix.rb
Overview
Read events over a UNIX socket.
Like stdin and file inputs, each event is assumed to be one line of text.
Can either accept connections from clients or connect to a server, depending on ‘mode`.
Defined Under Namespace
Classes: Interrupted
Constant Summary
Constants included from Config::Mixin
Instance Attribute Summary
Attributes inherited from Base
Attributes included from Config::Mixin
Attributes inherited from Plugin
Instance Method Summary collapse
-
#initialize(*args) ⇒ Unix
constructor
A new instance of Unix.
- #register ⇒ Object
- #run(output_queue) ⇒ Object
- #teardown ⇒ Object
Methods inherited from Base
Methods included from Config::Mixin
Methods inherited from Plugin
#eql?, #finished, #finished?, #hash, #inspect, lookup, #reload, #running?, #shutdown, #terminating?, #to_s
Constructor Details
#initialize(*args) ⇒ Unix
Returns a new instance of Unix.
36 37 38 |
# File 'lib/logstash/inputs/unix.rb', line 36 def initialize(*args) super(*args) end |
Instance Method Details
#register ⇒ Object
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/logstash/inputs/unix.rb', line 41 def register require "socket" require "timeout" if server? @logger.info("Starting unix input listener", :address => "#{@path}", :force_unlink => "#{@force_unlink}") begin @server_socket = UNIXServer.new(@path) rescue Errno::EADDRINUSE, IOError if @force_unlink File.unlink(@path) begin @server_socket = UNIXServer.new(@path) return rescue Errno::EADDRINUSE, IOError @logger.error("!!!Could not start UNIX server: Address in use", :path => @path) raise end end @logger.error("Could not start UNIX server: Address in use", :path => @path) raise end end end |
#run(output_queue) ⇒ Object
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
# File 'lib/logstash/inputs/unix.rb', line 113 def run(output_queue) if server? @thread = Thread.current @client_threads = [] loop do # Start a new thread for each connection. begin @client_threads << Thread.start(@server_socket.accept) do |s| # TODO(sissel): put this block in its own method. @logger.debug("Accepted connection", :server => "#{@path}") begin handle_socket(s, output_queue) rescue Interrupted s.close rescue nil end end # Thread.start rescue IOError, Interrupted if @interrupted # Intended shutdown, get out of the loop @server_socket.close @client_threads.each do |thread| thread.raise(IOError.new) end break else # Else it was a genuine IOError caused by something else, so propagate it up.. raise end end end # loop else loop do client_socket = UNIXSocket.new(@path) client_socket.instance_eval { class << self; include ::LogStash::Util::SocketPeer end } @logger.debug("Opened connection", :client => @path) handle_socket(client_socket, output_queue) end # loop end end |
#teardown ⇒ Object
156 157 158 159 160 161 162 |
# File 'lib/logstash/inputs/unix.rb', line 156 def teardown if server? File.unlink(@path) @interrupted = true @thread.raise(Interrupted.new) end end |