Class: LogStash::Codecs::Netflow
- Defined in:
- lib/logstash/codecs/netflow.rb
Overview
The “netflow” codec is for decoding Netflow v5/v9 flows.
Constant Summary
Constants included from LogStash::Config::Mixin
LogStash::Config::Mixin::CONFIGSORT
Instance Attribute Summary
Attributes included from LogStash::Config::Mixin
Attributes inherited from Plugin
Instance Method Summary collapse
- #decode(payload, &block) ⇒ Object
-
#initialize(params = {}) ⇒ Netflow
constructor
A new instance of Netflow.
- #register ⇒ Object
Methods inherited from Base
#clone, #encode, #flush, #on_event, #teardown
Methods included from LogStash::Config::Mixin
Methods inherited from Plugin
#eql?, #finished, #finished?, #hash, #inspect, lookup, #reload, #running?, #shutdown, #teardown, #terminating?, #to_s
Constructor Details
#initialize(params = {}) ⇒ Netflow
Returns a new instance of Netflow.
20 21 22 23 |
# File 'lib/logstash/codecs/netflow.rb', line 20 def initialize(params={}) super(params) @threadsafe = false end |
Instance Method Details
#decode(payload, &block) ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 |
# File 'lib/logstash/codecs/netflow.rb', line 32 def decode(payload, &block) header = Header.read(payload) unless @versions.include?(header.version) @logger.warn("Ignoring Netflow version v#{header.version}") return end if header.version == 5 flowset = Netflow5PDU.read(payload) elsif header.version == 9 flowset = Netflow9PDU.read(payload) else @logger.warn("Unsupported Netflow version v#{header.version}") return end flowset.records.each do |record| if flowset.version == 5 event = LogStash::Event.new # FIXME Probably not doing this right WRT JRuby? # # The flowset header gives us the UTC epoch seconds along with # residual nanoseconds so we can set @timestamp to that easily event["@timestamp"] = Time.at(flowset.unix_sec, flowset.unix_nsec / 1000).utc event[@target] = {} # Copy some of the pertinent fields in the header to the event ['version', 'flow_seq_num', 'engine_type', 'engine_id', 'sampling_algorithm', 'sampling_interval', 'flow_records'].each do |f| event[@target][f] = flowset[f] end # Create fields in the event from each field in the flow record record.each_pair do |k,v| case k.to_s when /_switched$/ # The flow record sets the first and last times to the device # uptime in milliseconds. Given the actual uptime is provided # in the flowset header along with the epoch seconds we can # convert these into absolute times millis = flowset.uptime - v seconds = flowset.unix_sec - (millis / 1000) micros = (flowset.unix_nsec / 1000) - (millis % 1000) if micros < 0 seconds-- micros += 1000000 end # FIXME Again, probably doing this wrong WRT JRuby? event[@target][k.to_s] = Time.at(seconds, micros).utc.strftime("%Y-%m-%dT%H:%M:%S.%3NZ") else event[@target][k.to_s] = v end end yield event elsif flowset.version == 9 case record.flowset_id when 0 # Template flowset record.flowset_data.templates.each do |template| catch (:field) do fields = [] template.fields.each do |field| entry = netflow_field_for(field.field_type, field.field_length) if ! entry throw :field end fields += entry end # We get this far, we have a list of fields #key = "#{flowset.source_id}|#{event["source"]}|#{template.template_id}" key = "#{flowset.source_id}|#{template.template_id}" @templates[key, @cache_ttl] = BinData::Struct.new(:endian => :big, :fields => fields) # Purge any expired templates @templates.cleanup! end end when 1 # Options template flowset record.flowset_data.templates.each do |template| catch (:field) do fields = [] template.option_fields.each do |field| entry = netflow_field_for(field.field_type, field.field_length) if ! entry throw :field end fields += entry end # We get this far, we have a list of fields #key = "#{flowset.source_id}|#{event["source"]}|#{template.template_id}" key = "#{flowset.source_id}|#{template.template_id}" @templates[key, @cache_ttl] = BinData::Struct.new(:endian => :big, :fields => fields) # Purge any expired templates @templates.cleanup! end end when 256..65535 # Data flowset #key = "#{flowset.source_id}|#{event["source"]}|#{record.flowset_id}" key = "#{flowset.source_id}|#{record.flowset_id}" template = @templates[key] if ! template #@logger.warn("No matching template for flow id #{record.flowset_id} from #{event["source"]}") @logger.warn("No matching template for flow id #{record.flowset_id}") next end length = record.flowset_length - 4 # Template shouldn't be longer than the record and there should # be at most 3 padding bytes if template.num_bytes > length or ! (length % template.num_bytes).between?(0, 3) @logger.warn("Template length doesn't fit cleanly into flowset", :template_id => record.flowset_id, :template_length => template.num_bytes, :record_length => length) next end array = BinData::Array.new(:type => template, :initial_length => length / template.num_bytes) records = array.read(record.flowset_data) records.each do |r| event = LogStash::Event.new( "@timestamp" => Time.at(flowset.unix_sec).utc, @target => {} ) # Fewer fields in the v9 header ['version', 'flow_seq_num'].each do |f| event[@target][f] = flowset[f] end event[@target]['flowset_id'] = record.flowset_id r.each_pair do |k,v| case k.to_s when /_switched$/ millis = flowset.uptime - v seconds = flowset.unix_sec - (millis / 1000) # v9 did away with the nanosecs field micros = 1000000 - (millis % 1000) event[@target][k.to_s] = Time.at(seconds, micros).utc.strftime("%Y-%m-%dT%H:%M:%S.%3NZ") else event[@target][k.to_s] = v end end yield event end else @logger.warn("Unsupported flowset id #{record.flowset_id}") end end end end |