Class: Fluent::MongoOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::MongoOutput
- Defined in:
- lib/fluent/plugin/out_mongo.rb
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #format(tag, event) ⇒ Object
-
#initialize ⇒ MongoOutput
constructor
A new instance of MongoOutput.
- #shutdown ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ MongoOutput
Returns a new instance of MongoOutput.
7 8 9 10 11 |
# File 'lib/fluent/plugin/out_mongo.rb', line 7 def initialize super require 'mongo' require 'msgpack' end |
Instance Method Details
#configure(conf) ⇒ Object
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/fluent/plugin/out_mongo.rb', line 13 def configure(conf) super raise ConfigError, "'database' parameter is required on Mongo output" unless @database_name = conf['database'] raise ConfigError, "'collection' parameter is required on Mongo output" unless @collection_name = conf['collection'] @host, @port = host_and_port(conf) # capped configuration if capped_conf = conf.elements.first raise ConfigError, "'size' parameter is required on <store> of Mongo output" unless capped_conf.has_key?('size') @capped_argument = {:capped => true} @capped_argument[:size] = Integer(capped_conf['size']) @capped_argument[:max] = Integer(capped_conf['max']) if capped_conf.has_key?('max') @capped_database_name = capped_conf['database'] || 'fluent' @capped_collection_name = capped_conf['collection'] || '__backup' @capped_host, @capped_port = host_and_port(capped_conf) end @backuped = false end |
#format(tag, event) ⇒ Object
48 49 50 |
# File 'lib/fluent/plugin/out_mongo.rb', line 48 def format(tag, event) event.record.to_msgpack end |
#shutdown ⇒ Object
41 42 43 44 45 46 |
# File 'lib/fluent/plugin/out_mongo.rb', line 41 def shutdown # Mongo::Connection checks alive or closed myself @collection.db.connection.close @capped.db.connection.close unless @capped.nil? super end |
#start ⇒ Object
35 36 37 38 39 |
# File 'lib/fluent/plugin/out_mongo.rb', line 35 def start super @collection = Mongo::Connection.new(@host, @port).db(@database_name).collection(@collection_name) @capped = capped_collection unless @capped_argument.nil? end |
#write(chunk) ⇒ Object
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/fluent/plugin/out_mongo.rb', line 52 def write(chunk) records = [] chunk.open { |io| begin MessagePack::Unpacker.new(io).each { |record| records << record } rescue EOFError # EOFError always occured when reached end of chunk. end } unless @backuped or @capped.nil? @capped.insert(records) @backuped = true end @collection.insert(records) @backuped = false end |