Class: Fluent::Plugin::NetflowipfixInput::UdpListenerThread

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/in_netflowipfix.rb

Instance Method Summary collapse

Constructor Details

#initialize(bind, port, udpQueue, tag, log) ⇒ UdpListenerThread

Returns a new instance of UdpListenerThread.



153
154
155
156
157
158
159
160
161
# File 'lib/fluent/plugin/in_netflowipfix.rb', line 153

def initialize(bind, port, udpQueue, tag, log)
	@port = port
	@udpQueue = udpQueue
	@udp_socket = UDPSocket.new
	@udp_socket.bind(bind, port)
	@total = 0
	@tag = tag
	@log = log
end

Instance Method Details

#closeObject



168
169
170
# File 'lib/fluent/plugin/in_netflowipfix.rb', line 168

def close
		@udp_socket.close
end

#joinObject



172
173
174
# File 'lib/fluent/plugin/in_netflowipfix.rb', line 172

def join
		@thread.join
end

#runObject



177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
# File 'lib/fluent/plugin/in_netflowipfix.rb', line 177

def run
		loop do
			msg, sender =  @udp_socket.recvfrom(4096)
			@total = @total + msg.length
			@log.trace "UdpListenerThread::recvfrom #{msg.length} bytes for #{@total} total on UDP/#{@port}"
			record = {}
			record["message"] = msg
			record["length"] = msg.length
			record["total"] = @total
			record["sender"] = sender
			record["port"] = @port
#				time = EventTime.new()
			time = Time.now.getutc
			@udpQueue << [time, record]
		end
end

#startObject



163
164
165
166
# File 'lib/fluent/plugin/in_netflowipfix.rb', line 163

def start
	@thread = Thread.new(&method(:run))
	@log.trace "UdpListenerThread::start"
end