Class: Fluent::MongoOutput

Inherits:
BufferedOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_mongo.rb

Instance Method Summary collapse

Constructor Details

#initializeMongoOutput

Returns a new instance of MongoOutput.



7
8
9
10
11
# File 'lib/fluent/plugin/out_mongo.rb', line 7

def initialize
  super
  require 'mongo'
  require 'msgpack'
end

Instance Method Details

#configure(conf) ⇒ Object

Raises:

  • (ConfigError)


13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/fluent/plugin/out_mongo.rb', line 13

def configure(conf)
  super

  raise ConfigError, "'database' parameter is required on Mongo output"   unless @database_name = conf['database']
  raise ConfigError, "'collection' parameter is required on Mongo output" unless @collection_name = conf['collection']
  @host, @port = host_and_port(conf)

  # capped configuration
  if capped_conf = conf.elements.first
    raise ConfigError, "'size' parameter is required on <store> of Mongo output" unless capped_conf.has_key?('size')
    @capped_argument = {:capped => true}
    @capped_argument[:size] = Integer(capped_conf['size'])
    @capped_argument[:max]  = Integer(capped_conf['max']) if capped_conf.has_key?('max')

    @capped_database_name = capped_conf['database'] || 'fluent'
    @capped_collection_name = capped_conf['collection'] || '__backup'
    @capped_host, @capped_port = host_and_port(capped_conf)
  end

  @backuped = false
end

#format(tag, event) ⇒ Object



48
49
50
# File 'lib/fluent/plugin/out_mongo.rb', line 48

def format(tag, event)
  event.record.to_msgpack
end

#shutdownObject



41
42
43
44
45
46
# File 'lib/fluent/plugin/out_mongo.rb', line 41

def shutdown
  # Mongo::Connection checks alive or closed myself
  @collection.db.connection.close
  @capped.db.connection.close unless @capped.nil?
  super
end

#startObject



35
36
37
38
39
# File 'lib/fluent/plugin/out_mongo.rb', line 35

def start
  super
  @collection = Mongo::Connection.new(@host, @port).db(@database_name).collection(@collection_name)
  @capped = capped_collection unless @capped_argument.nil?
end

#write(chunk) ⇒ Object



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/fluent/plugin/out_mongo.rb', line 52

def write(chunk)
  records = []
  chunk.open { |io|
    begin
      MessagePack::Unpacker.new(io).each { |record| records << record }
    rescue EOFError
      # EOFError always occured when reached end of chunk.
    end
  }

  unless @backuped or @capped.nil?
    @capped.insert(records)
    @backuped = true
  end

  @collection.insert(records)
  @backuped = false
end