Class: Fluent::Plugin::NetflowipfixInput::UdpListenerThread
- Inherits:
-
Object
- Object
- Fluent::Plugin::NetflowipfixInput::UdpListenerThread
- Defined in:
- lib/fluent/plugin/in_netflowipfix.rb
Instance Method Summary collapse
- #close ⇒ Object
-
#initialize(bind, port, udpQueue, tag, log) ⇒ UdpListenerThread
constructor
A new instance of UdpListenerThread.
- #join ⇒ Object
- #run ⇒ Object
- #start ⇒ Object
Constructor Details
#initialize(bind, port, udpQueue, tag, log) ⇒ UdpListenerThread
Returns a new instance of UdpListenerThread.
153 154 155 156 157 158 159 160 161 |
# File 'lib/fluent/plugin/in_netflowipfix.rb', line 153 def initialize(bind, port, udpQueue, tag, log) @port = port @udpQueue = udpQueue @udp_socket = UDPSocket.new @udp_socket.bind(bind, port) @total = 0 @tag = tag @log = log end |
Instance Method Details
#close ⇒ Object
168 169 170 |
# File 'lib/fluent/plugin/in_netflowipfix.rb', line 168 def close @udp_socket.close end |
#join ⇒ Object
172 173 174 |
# File 'lib/fluent/plugin/in_netflowipfix.rb', line 172 def join @thread.join end |
#run ⇒ Object
177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 |
# File 'lib/fluent/plugin/in_netflowipfix.rb', line 177 def run loop do msg, sender = @udp_socket.recvfrom(4096) @total = @total + msg.length @log.trace "UdpListenerThread::recvfrom #{msg.length} bytes for #{@total} total on UDP/#{@port}" record = {} record["message"] = msg record["length"] = msg.length record["total"] = @total record["sender"] = sender record["port"] = @port # time = EventTime.new() time = Time.now.getutc @udpQueue << [time, record] end end |
#start ⇒ Object
163 164 165 166 |
# File 'lib/fluent/plugin/in_netflowipfix.rb', line 163 def start @thread = Thread.new(&method(:run)) @log.trace "UdpListenerThread::start" end |