Class: LogStash::Inputs::Collectd
- Inherits:
-
Base
show all
- Defined in:
- lib/logstash/inputs/collectd.rb
Constant Summary
Config::Mixin::CONFIGSORT
Instance Attribute Summary
Attributes inherited from Base
#params, #threadable
#config, #original_params
Attributes inherited from Plugin
#logger, #params
Instance Method Summary
collapse
Methods inherited from Base
#tag
#config_init, included
Methods inherited from Plugin
#eql?, #finished, #finished?, #hash, #inspect, lookup, #reload, #running?, #shutdown, #terminating?, #to_s
Constructor Details
#initialize(params) ⇒ Collectd
Returns a new instance of Collectd.
59
60
61
62
63
64
65
66
67
68
69
|
# File 'lib/logstash/inputs/collectd.rb', line 59
def initialize(params)
super
BasicSocket.do_not_reverse_lookup = true
@idbyte = 0
@length = 0
@prev_typenum = 0
@header = []; @body = []
@timestamp = Time.now().utc
@collectd = {}
@types = {}
end
|
Instance Method Details
#get_types(paths) ⇒ Object
113
114
115
116
117
118
119
120
121
122
123
124
125
126
|
# File 'lib/logstash/inputs/collectd.rb', line 113
def get_types(paths)
paths.each do |path|
@logger.info("Getting Collectd typesdb info", :typesdb => path.to_s)
File.open(path, 'r').each_line do |line|
typename, *line = line.strip.split
next if typename.nil? || if typename[0,1] != '#' v = line.collect { |l| l.strip.split(":")[0] }
@types[typename] = v
end
end
end
@logger.debug("Collectd Types", :types => @types.to_s)
end
|
#get_values(id, body) ⇒ Object
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
|
# File 'lib/logstash/inputs/collectd.rb', line 156
def get_values(id, body)
retval = ''
case id
when 0,2,3,4,5,100 retval = body.pack("C*")
retval = retval[0..-2]
when 1 byte1, byte2 = body.pack("C*").unpack("NN")
retval = Time.at(( ((byte1 << 32) + byte2))).utc
when 7,101 retval = body.slice!(0..7).pack("C*").unpack("E")[0]
when 8 byte1, byte2 = body.pack("C*").unpack("NN")
retval = Time.at(( ((byte1 << 32) + byte2) * (2**-30) )).utc
when 9 byte1, byte2 = body.pack("C*").unpack("NN")
retval = (((byte1 << 32) + byte2) * (2**-30)).to_i
when 6 val_bytes = body.slice!(0..1)
val_count = val_bytes.pack("C*").unpack("n")
if body.length % 9 == 0 count = 0
retval = []
types = body.slice!(0..((body.length/9)-1))
while body.length > 0
vtype = vt_map(types[count])
case types[count]
when 0, 3; v = body.slice!(0..7).pack("C*").unpack("Q>")[0]
when 1; v = body.slice!(0..7).pack("C*").unpack("E")[0]
when 2; v = body.slice!(0..7).pack("C*").unpack("q>")[0]
else; v = 0
end
retval << v
count += 1
end
else
@logger.error("Incorrect number of data fields for collectd record", :body => body.to_s)
end
end
case id
when 2
if @plugin != retval @plugin_instance = ''
@collectd.delete('plugin_instance')
end
@plugin = retval
when 0; @cdhost = retval
when 3; @plugin_instance = retval
when 4; @cdtype = retval
when 5; @type_instance = retval
when 1,8; @timestamp = retval
end
return retval
end
|
#register ⇒ Object
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
|
# File 'lib/logstash/inputs/collectd.rb', line 72
def register
@udp = nil
if @typesdb.nil?
if __FILE__ =~ /^file:\/.+!.+/
begin
jar_path = [__FILE__.split("!").first, "/types.db"].join("!")
@typesdb = [jar_path]
rescue => ex
raise "Failed to cache, due to: #{ex}\n#{ex.backtrace}"
end
else
if File.exists?("types.db")
@typesdb = ["types.db"]
elsif File.exists?("vendor/collectd/types.db")
@typesdb = ["vendor/collectd/types.db"]
else
raise "You must specify 'typesdb => ...' in your collectd input"
end
end
end
@logger.info("Using internal types.db", :typesdb => @typesdb.to_s)
end
|
#run(output_queue) ⇒ Object
97
98
99
100
101
102
103
104
105
106
107
108
109
110
|
# File 'lib/logstash/inputs/collectd.rb', line 97
def run(output_queue)
begin
get_types(@typesdb)
collectd_listener(output_queue)
rescue LogStash::ShutdownSignal
rescue => e
@logger.warn("Collectd listener died", :exception => e, :backtrace => e.backtrace)
sleep(5)
retry
end end
|
#teardown ⇒ Object
302
303
304
|
# File 'lib/logstash/inputs/collectd.rb', line 302
def teardown
@udp.close if @udp && !@udp.closed?
end
|
#type_map(id) ⇒ Object
129
130
131
132
133
134
135
136
137
138
139
140
141
142
|
# File 'lib/logstash/inputs/collectd.rb', line 129
def type_map(id)
case id
when 0; return "host"
when 1,8; return "@timestamp"
when 2; return "plugin"
when 3; return "plugin_instance"
when 4; return "collectd_type"
when 5; return "type_instance"
when 6; return "values"
when 9; return "interval"
when 100; return "message"
when 101; return "severity"
end
end
|
#vt_map(id) ⇒ Object
145
146
147
148
149
150
151
152
153
|
# File 'lib/logstash/inputs/collectd.rb', line 145
def vt_map(id)
case id
when 0; return "COUNTER"
when 1; return "GAUGE"
when 2; return "DERIVE"
when 3; return "ABSOLUTE"
else; return 'UNKNOWN'
end
end
|