Class: Fluent::MongoTailInput

Inherits:
Input
  • Object
show all
Includes:
MongoUtil
Defined in:
lib/fluent/plugin/in_mongo_tail.rb

Instance Method Summary collapse

Methods included from MongoUtil

#authenticate, included

Constructor Details

#initializeMongoTailInput

Returns a new instance of MongoTailInput.



33
34
35
36
37
38
39
# File 'lib/fluent/plugin/in_mongo_tail.rb', line 33

def initialize
  super
  require 'mongo'
  require 'bson'

  @connection_options = {}
end

Instance Method Details

#configure(conf) ⇒ Object



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/fluent/plugin/in_mongo_tail.rb', line 41

def configure(conf)
  super

  if !@tag and !@tag_key
    raise ConfigError, "'tag' or 'tag_key' option is required on mongo_tail input"
  end

  if @database && @url
    raise ConfigError, "Both 'database' and 'url' can not be set"
  end

  if !@database && !@url
    raise ConfigError, "One of 'database' or 'url' must be specified"
  end

  @last_id = get_last_id
  @connection_options[:ssl] = @ssl

  $log.debug "Setup mongo_tail configuration: mode = #{@id_store_file || @id_store_collection ? 'persistent' : 'non-persistent'}, last_id = #{@last_id}"
end

#runObject



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/fluent/plugin/in_mongo_tail.rb', line 79

def run
  loop {
    cursor = Mongo::Cursor.new(@client, cursor_conf)
    begin
      loop {
        return if @stop
        
        cursor = Mongo::Cursor.new(@client, cursor_conf) unless cursor.alive?
        if doc = cursor.next_document
          process_document(doc)
        else
          sleep @wait_time
        end
      }
    rescue
      # ignore Mongo::OperationFailuer at CURSOR_NOT_FOUND
    end
  }
end

#shutdownObject



69
70
71
72
73
74
75
76
77
# File 'lib/fluent/plugin/in_mongo_tail.rb', line 69

def shutdown
  save_last_id(@last_id) unless @last_id
  close_id_storage

  @stop = true
  @thread.join
  @client.db.connection.close
  super
end

#startObject



62
63
64
65
66
67
# File 'lib/fluent/plugin/in_mongo_tail.rb', line 62

def start
  super
  open_id_storage
  @client = get_capped_collection
  @thread = Thread.new(&method(:run))
end