Class: Fluent::Plugin::NetflowipfixInput
- Inherits:
-
Input
- Object
- Input
- Fluent::Plugin::NetflowipfixInput
show all
- Includes:
- DetachMultiProcessMixin
- Defined in:
- lib/fluent/plugin/in_netflowipfix.rb,
lib/fluent/plugin/parser_netflow_v5.rb,
lib/fluent/plugin/parser_netflow_v9.rb,
lib/fluent/plugin/netflowipfix_records.rb
Defined Under Namespace
Classes: Header, IP4Addr, IP6Addr, MacAddr, MplsLabel, Netflow10Packet, Netflow5Packet, Netflow9Packet, OctetArray1, OctetArray2, Option10, Option9, ParserIPfixv10, ParserNetflowBase, ParserNetflowIpfix, ParserNetflowv5, ParserNetflowv9, ParserThread, PortConnection, Template10, Template9, UdpListenerThread
Instance Method Summary
collapse
Instance Method Details
84
85
86
87
88
89
90
91
92
93
|
# File 'lib/fluent/plugin/in_netflowipfix.rb', line 84
def configure(conf)
super
$log.debug "NetflowipfixInput::configure: #{@bind}:#{@port}"
@@connections ||= {}
if @@connections.nil?
end
@@connections[@port] = PortConnection.new(@bind, @port, @tag, @cache_ttl, @definitions, @queuesleep, log)
log.debug "NetflowipfixInput::configure NB=#{@@connections.length}"
@total = 0
end
|
#shutdown ⇒ Object
110
111
112
113
114
115
116
117
118
119
120
121
122
|
# File 'lib/fluent/plugin/in_netflowipfix.rb', line 110
def shutdown
super
$log.debug "NetflowipfixInput::shutdown NB=#{@@connections.length}"
if @@connections.nil?
else
@@connections.each do | port, conn |
$log.debug "shutdown listening UDP on #{conn.bind}:#{conn.port}"
conn.stop
end
@@connections = nil
end
end
|
#start ⇒ Object
95
96
97
98
99
100
101
102
103
104
105
106
107
108
|
# File 'lib/fluent/plugin/in_netflowipfix.rb', line 95
def start
super
$log.debug "NetflowipfixInput::start NB=#{@@connections.length}"
if @@connections.nil?
else
@@connections.each do | port, conn |
$log.debug "start listening UDP on #{conn.bind}:#{conn.port}"
conn.start
end
end
waitForEvents
end
|
#waitForEvents ⇒ Object
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
|
# File 'lib/fluent/plugin/in_netflowipfix.rb', line 125
def waitForEvents
loop do
@@connections.each do | port, conn |
if (conn.event_queue_length > 0)
$log.trace "waitForEvents: #{conn.bind}:#{conn.port} queue has #{conn.event_queue_length} elements"
nbq = conn.event_queue_length
loop do
ar = conn.event_pop
time = ar[0]
record = ar[1]
router.emit(conn.tag, EventTime.new(time.to_i), record)
nbq = nbq - 1
break if nbq == 0
end
end
end
$log.trace "waitForEvents: sleep #{@queuesleep}"
sleep(@queuesleep)
end
end
|