Class: Fluent::Plugin::NetflowipfixInput::ParserThread
- Inherits:
-
Object
- Object
- Fluent::Plugin::NetflowipfixInput::ParserThread
- Defined in:
- lib/fluent/plugin/in_netflowipfix.rb
Overview
class UdpListenerThread
Instance Method Summary collapse
- #close ⇒ Object
-
#emit(time, event, host = nil) ⇒ Object
def run.
-
#initialize(udpQueue, queuesleep, eventQueue, cache_ttl, definitions, log) ⇒ ParserThread
constructor
A new instance of ParserThread.
- #join ⇒ Object
- #run ⇒ Object
- #start ⇒ Object
Constructor Details
#initialize(udpQueue, queuesleep, eventQueue, cache_ttl, definitions, log) ⇒ ParserThread
Returns a new instance of ParserThread.
196 197 198 199 200 201 202 203 204 205 206 207 208 |
# File 'lib/fluent/plugin/in_netflowipfix.rb', line 196 def initialize(udpQueue, queuesleep, eventQueue, cache_ttl, definitions, log) @udpQueue = udpQueue @queuesleep = queuesleep @eventQueue = eventQueue @log = log @parser_v5 = NetflowipfixInput::ParserNetflowv5.new @parser_v9 = NetflowipfixInput::ParserNetflowv9.new @parser_v10 = NetflowipfixInput::ParserIPfixv10.new @parser_v9.configure(cache_ttl, definitions) @parser_v10.configure(cache_ttl, definitions) end |
Instance Method Details
#close ⇒ Object
214 215 |
# File 'lib/fluent/plugin/in_netflowipfix.rb', line 214 def close end |
#emit(time, event, host = nil) ⇒ Object
def run
254 255 256 257 258 259 260 |
# File 'lib/fluent/plugin/in_netflowipfix.rb', line 254 def emit(time, event, host = nil) if !host.nil? event["host"] = host end @eventQueue << [time, event] @log.trace "ParserThread::emit #{@eventQueue.length}" end |
#join ⇒ Object
217 218 219 |
# File 'lib/fluent/plugin/in_netflowipfix.rb', line 217 def join @thread.join end |
#run ⇒ Object
221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 |
# File 'lib/fluent/plugin/in_netflowipfix.rb', line 221 def run loop do if @udpQueue.length == 0 sleep(@queuesleep) else block = method(:emit) ar = @udpQueue.pop time = ar[0] msg = ar[1] payload = msg["message"] host = msg["sender"] version,_ = payload[0,2].unpack('n') @log.trace "ParserThread::pop #{@udpQueue.length} v#{version}" case version when 5 packet = NetflowipfixInput::Netflow5Packet.read(payload) @parser_v5.handle_v5(host, packet, block) when 9 packet = NetflowipfixInput::Netflow9Packet.read(payload) @parser_v9.handle_v9(host, packet, block) when 10 packet = NetflowipfixInput::Netflow10Packet.read(payload) @parser_v10.handle_v10(host, packet, block) else $log.warn "Unsupported Netflow version v#{version}: #{version.class}" end # case end end # loop do end |
#start ⇒ Object
209 210 211 212 |
# File 'lib/fluent/plugin/in_netflowipfix.rb', line 209 def start @thread = Thread.new(&method(:run)) @log.debug "ParserThread::start" end |