Class: LogStash::Inputs::MongoDB

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/inputs/mongodb.rb

Constant Summary collapse

SINCE_TABLE =
:since_table

Instance Method Summary collapse

Instance Method Details

#closeObject

def run



387
388
389
390
# File 'lib/logstash/inputs/mongodb.rb', line 387

def close
  # If needed, use this to tidy up on shutdown
  @logger.debug("Shutting down...")
end

#flatten(my_hash) ⇒ Object



195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
# File 'lib/logstash/inputs/mongodb.rb', line 195

def flatten(my_hash)
  new_hash = {}
  @logger.debug("Raw Hash: #{my_hash}")
  if my_hash.respond_to? :each
    my_hash.each do |k1,v1|
      if v1.is_a?(Hash)
        v1.each do |k2,v2|
          if v2.is_a?(Hash)
            # puts "Found a nested hash"
            result = flatten(v2)
            result.each do |k3,v3|
              new_hash[k1.to_s+"_"+k2.to_s+"_"+k3.to_s] = v3
            end
            # puts "result: "+result.to_s+" k2: "+k2.to_s+" v2: "+v2.to_s
          else
            new_hash[k1.to_s+"_"+k2.to_s] = v2
          end
        end
      else
        # puts "Key: "+k1.to_s+" is not a hash"
        new_hash[k1.to_s] = v1
      end
    end
  else
    @logger.debug("Flatten [ERROR]: hash did not respond to :each")
  end
  @logger.debug("Flattened Hash: #{new_hash}")
  return new_hash
end

#get_all_tables(mongodb) ⇒ Object



130
131
132
# File 'lib/logstash/inputs/mongodb.rb', line 130

def get_all_tables(mongodb)
  return @mongodb.collection_names
end

#get_collection_names(mongodb, collection) ⇒ Object



135
136
137
138
139
140
141
142
143
144
# File 'lib/logstash/inputs/mongodb.rb', line 135

def get_collection_names(mongodb, collection)
  collection_names = []
  @mongodb.collection_names.each do |coll|
    if /#{collection}/ =~ coll
      collection_names.push(coll)
      @logger.debug("Added #{coll} to the collection list as it matches our collection search")
    end
  end
  return collection_names
end

#get_cursor_for_collection(mongodb, mongo_collection_name, last_id_object, batch_size) ⇒ Object



147
148
149
150
151
152
# File 'lib/logstash/inputs/mongodb.rb', line 147

def get_cursor_for_collection(mongodb, mongo_collection_name, last_id_object, batch_size)
  collection = mongodb.collection(mongo_collection_name)
  # Need to make this sort by date in object id then get the first of the series
  # db.events_20150320.find().limit(1).sort({ts:1})
  return collection.find({:_id => {:$gt => last_id_object}}).limit(batch_size)
end

#get_placeholder(sqlitedb, since_table, mongodb, mongo_collection_name) ⇒ Object



109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/logstash/inputs/mongodb.rb', line 109

def get_placeholder(sqlitedb, since_table, mongodb, mongo_collection_name)
  since = sqlitedb[SINCE_TABLE]
  x = since.where(:table => "#{since_table}_#{mongo_collection_name}")
  if x[:place].nil? || x[:place] == 0
    first_entry_id = init_placeholder(sqlitedb, since_table, mongodb, mongo_collection_name)
    @logger.debug("FIRST ENTRY ID for #{mongo_collection_name} is #{first_entry_id}")
    return first_entry_id
  else
    @logger.debug("placeholder already exists, it is #{x[:place]}")
    return x[:place][:place]
  end
end

#init_placeholder(sqlitedb, since_table, mongodb, mongo_collection_name) ⇒ Object



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/logstash/inputs/mongodb.rb', line 91

def init_placeholder(sqlitedb, since_table, mongodb, mongo_collection_name)
  @logger.debug("init placeholder for #{since_table}_#{mongo_collection_name}")
  since = sqlitedb[SINCE_TABLE]
  mongo_collection = mongodb.collection(mongo_collection_name)

  first_entry = mongo_collection.find({}).sort(since_column => 1).limit(1).first
  first_entry_id = ''
  if since_type == 'id'
    first_entry_id = first_entry[since_column].to_s
  else
    first_entry_id = first_entry[since_column].to_i
  end
  since.insert(:table => "#{since_table}_#{mongo_collection_name}", :place => first_entry_id)
  @logger.info("init placeholder for #{since_table}_#{mongo_collection_name}: #{first_entry}")
  return first_entry_id
end

#init_placeholder_table(sqlitedb) ⇒ Object



79
80
81
82
83
84
85
86
87
88
# File 'lib/logstash/inputs/mongodb.rb', line 79

def init_placeholder_table(sqlitedb)
  begin
    sqlitedb.create_table "#{SINCE_TABLE}" do
      String :table
      Int :place
    end
  rescue
    @logger.debug("since table already exists")
  end
end

#registerObject



169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/logstash/inputs/mongodb.rb', line 169

def register
  require "jdbc/sqlite3"
  require "sequel"
  placeholder_db_path = File.join(@placeholder_db_dir, @placeholder_db_name)
  conn = Mongo::Client.new(@uri)

  @host = Socket.gethostname
  @logger.info("Registering MongoDB input")

  @mongodb = conn.database
  @sqlitedb = Sequel.connect("jdbc:sqlite:#{placeholder_db_path}")

  # Should check to see if there are new matching tables at a predefined interval or on some trigger
  @collection_data = update_watched_collections(@mongodb, @collection, @sqlitedb)
end

#run(queue) ⇒ Object



225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
# File 'lib/logstash/inputs/mongodb.rb', line 225

def run(queue)
  sleep_min = 0.01
  sleep_max = 5
  sleeptime = sleep_min

  @logger.debug("Tailing MongoDB")
  @logger.debug("Collection data is: #{@collection_data}")

  while true && !stop?
    begin
      @collection_data.each do |index, collection|
        collection_name = collection[:name]
        @logger.debug("collection_data is: #{@collection_data}")
        last_id = @collection_data[index][:last_id]
        #@logger.debug("last_id is #{last_id}", :index => index, :collection => collection_name)
        # get batch of events starting at the last_place if it is set


        last_id_object = last_id
        if since_type == 'id'
          last_id_object = BSON::ObjectId(last_id)
        elsif since_type == 'time'
          if last_id != ''
            last_id_object = Time.at(last_id)
          end
        end
        cursor = get_cursor_for_collection(@mongodb, collection_name, last_id_object, batch_size)
        cursor.each do |doc|
          logdate = DateTime.parse(doc['_id'].generation_time.to_s)
          event = LogStash::Event.new("host" => @host)
          decorate(event)
          event.set("logdate",logdate.iso8601.force_encoding(Encoding::UTF_8))
          log_entry = doc.to_h.to_s
          log_entry['_id'] = log_entry['_id'].to_s
          event.set("log_entry",log_entry.force_encoding(Encoding::UTF_8))
          event.set("mongo_id",doc['_id'].to_s)
          @logger.debug("mongo_id: "+doc['_id'].to_s)
          #@logger.debug("EVENT looks like: "+event.to_s)
          #@logger.debug("Sent message: "+doc.to_h.to_s)
          #@logger.debug("EVENT looks like: "+event.to_s)
          # Extract the HOST_ID and PID from the MongoDB BSON::ObjectID
          if @unpack_mongo_id
            doc_hex_bytes = doc['_id'].to_s.each_char.each_slice(2).map {|b| b.join.to_i(16) }
            doc_obj_bin = doc_hex_bytes.pack("C*").unpack("a4 a3 a2 a3")
            host_id = doc_obj_bin[1].unpack("S")
            process_id = doc_obj_bin[2].unpack("S")
            event.set('host_id',host_id.first.to_i)
            event.set('process_id',process_id.first.to_i)
          end

          if @parse_method == 'flatten'
            # Flatten the JSON so that the data is usable in Kibana
            flat_doc = flatten(doc)
            # Check for different types of expected values and add them to the event
            if flat_doc['info_message'] && (flat_doc['info_message']  =~ /collection stats: .+/)
              # Some custom stuff I'm having to do to fix formatting in past logs...
              sub_value = flat_doc['info_message'].sub("collection stats: ", "")
              JSON.parse(sub_value).each do |k1,v1|
                flat_doc["collection_stats_#{k1.to_s}"] = v1
              end
            end

            flat_doc.each do |k,v|
              # Check for an integer
              @logger.debug("key: #{k.to_s} value: #{v.to_s}")
              if v.is_a? Numeric
                event.set(k.to_s,v)
              elsif v.is_a? Time
                event.set(k.to_s,v.iso8601)

              elsif v.is_a? String
                if v == "NaN"
                  event.set(k.to_s, Float::NAN)
                elsif /\A[-+]?\d+[.][\d]+\z/ == v
                  event.set(k.to_s, v.to_f)
                elsif (/\A[-+]?\d+\z/ === v) || (v.is_a? Integer)
                  event.set(k.to_s, v.to_i)
                else
                  event.set(k.to_s, v)
                end
              else
                if k.to_s  == "_id" || k.to_s == "tags"
                  event.set(k.to_s, v.to_s )
                end
                if (k.to_s == "tags") && (v.is_a? Array)
                  event.set('tags',v)
                end
              end
            end
          elsif @parse_method == 'dig'
            # Dig into the JSON and flatten select elements
            doc.each do |k, v|
              if k != "_id"
                if (@dig_fields.include? k) && (v.respond_to? :each)
                  v.each do |kk, vv|
                    if (@dig_dig_fields.include? kk) && (vv.respond_to? :each)
                      vv.each do |kkk, vvv|
                        if /\A[-+]?\d+\z/ === vvv
                          event.set("#{k}_#{kk}_#{kkk}",vvv.to_i)
                        else
                          event.set("#{k}_#{kk}_#{kkk}", vvv.to_s)
                        end
                      end
                    else
                      if /\A[-+]?\d+\z/ === vv
                        event.set("#{k}_#{kk}", vv.to_i)
                      else
                        event.set("#{k}_#{kk}",vv.to_s)
                      end
                    end
                  end
                else
                  if /\A[-+]?\d+\z/ === v
                    event.set(k,v.to_i)
                  else
                    event.set(k,v.to_s)
                  end
                end
              end
            end
          elsif @parse_method == 'simple'
            doc.each do |k, v|
                if v.is_a? Numeric
                  event.set(k, v.abs)
                elsif v.is_a? Array
                  event.set(k, v)
                elsif v == "NaN"
                  event.set(k, Float::NAN)
                else
                  event.set(k, v.to_s)
                end
            end
          end

          queue << event

          since_id = doc[since_column]
          if since_type == 'id'
            since_id = doc[since_column].to_s
          elsif since_type == 'time'
            since_id = doc[since_column].to_i
          end

          @collection_data[index][:last_id] = since_id
        end
        # Store the last-seen doc in the database
        update_placeholder(@sqlitedb, since_table, collection_name, @collection_data[index][:last_id])
      end
      @logger.debug("Updating watch collections")
      @collection_data = update_watched_collections(@mongodb, @collection, @sqlitedb)

      # nothing found in that iteration
      # sleep a bit
      @logger.debug("No new rows. Sleeping.", :time => sleeptime)
      sleeptime = [sleeptime * 2, sleep_max].min
      sleep(sleeptime)
    rescue => e
      @logger.warn('MongoDB Input threw an exception, restarting', :exception => e)
    end
  end
end

#update_placeholder(sqlitedb, since_table, mongo_collection_name, place) ⇒ Object



123
124
125
126
127
# File 'lib/logstash/inputs/mongodb.rb', line 123

def update_placeholder(sqlitedb, since_table, mongo_collection_name, place)
  #@logger.debug("updating placeholder for #{since_table}_#{mongo_collection_name} to #{place}")
  since = sqlitedb[SINCE_TABLE]
  since.where(:table => "#{since_table}_#{mongo_collection_name}").update(:place => place)
end

#update_watched_collections(mongodb, collection, sqlitedb) ⇒ Object



155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/logstash/inputs/mongodb.rb', line 155

def update_watched_collections(mongodb, collection, sqlitedb)
  collections = get_collection_names(mongodb, collection)
  collection_data = {}
  collections.each do |my_collection|
    init_placeholder_table(sqlitedb)
    last_id = get_placeholder(sqlitedb, since_table, mongodb, my_collection)
    if !collection_data[my_collection]
      collection_data[my_collection] = { :name => my_collection, :last_id => last_id }
    end
  end
  return collection_data
end