Class: Fluent::Plugin::NetflowipfixInput

Inherits:
Input
  • Object
show all
Includes:
DetachMultiProcessMixin
Defined in:
lib/fluent/plugin/in_netflowipfix.rb,
lib/fluent/plugin/parser_netflow_v5.rb,
lib/fluent/plugin/parser_netflow_v9.rb,
lib/fluent/plugin/netflowipfix_records.rb

Defined Under Namespace

Classes: Header, IP4Addr, IP6Addr, MacAddr, MplsLabel, Netflow10Packet, Netflow5Packet, Netflow9Packet, OctetArray1, OctetArray2, Option10, Option9, ParserIPfixv10, ParserNetflowBase, ParserNetflowIpfix, ParserNetflowv5, ParserNetflowv9, ParserThread, PortConnection, Template10, Template9, UdpListenerThread

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



84
85
86
87
88
89
90
91
92
93
# File 'lib/fluent/plugin/in_netflowipfix.rb', line 84

def configure(conf)
  super
  $log.debug "NetflowipfixInput::configure: #{@bind}:#{@port}"
  @@connections ||=  {}
  if @@connections.nil?
  end
  @@connections[@port] = PortConnection.new(@bind, @port, @tag, @cache_ttl, @definitions, @queuesleep, log)
  log.debug "NetflowipfixInput::configure NB=#{@@connections.length}"  
  @total = 0
end

#shutdownObject



110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/fluent/plugin/in_netflowipfix.rb', line 110

def shutdown
  super
  $log.debug "NetflowipfixInput::shutdown NB=#{@@connections.length}"  
  if @@connections.nil?
  else
    @@connections.each do | port, conn |
      $log.debug "shutdown listening UDP on #{conn.bind}:#{conn.port}"
      conn.stop        
    end
    @@connections = nil
  end

end

#startObject



95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/fluent/plugin/in_netflowipfix.rb', line 95

def start
  super
  
  $log.debug "NetflowipfixInput::start NB=#{@@connections.length}" 
  if @@connections.nil?
  else
    @@connections.each do | port, conn |
      $log.debug "start listening UDP on #{conn.bind}:#{conn.port}"
      conn.start       
    end
  end      
  
  waitForEvents
end

#waitForEventsObject



125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/fluent/plugin/in_netflowipfix.rb', line 125

def waitForEvents
  loop do
      @@connections.each do | port, conn |
        if (conn.event_queue_length > 0) 
          $log.trace "waitForEvents: #{conn.bind}:#{conn.port} queue has #{conn.event_queue_length} elements"
          nbq = conn.event_queue_length 
          loop do
            ar = conn.event_pop     
            time = ar[0]
            record = ar[1]
            router.emit(conn.tag, EventTime.new(time.to_i), record)
            nbq = nbq - 1
            break if nbq == 0
          end 
        end
      end
      $log.trace "waitForEvents: sleep #{@queuesleep}"
      sleep(@queuesleep)

  end

end