Class: Fluent::MongoOutput

Inherits:
BufferedOutput
  • Object
show all
Includes:
MongoUtil, SetTagKeyMixin, SetTimeKeyMixin
Defined in:
lib/fluent/plugin/out_mongo.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from MongoUtil

#authenticate, included

Constructor Details

#initializeMongoOutput

Returns a new instance of MongoOutput.



31
32
33
34
35
36
37
38
39
# File 'lib/fluent/plugin/out_mongo.rb', line 31

def initialize
  super
  require 'mongo'
  require 'msgpack'

  @clients = {}
  @connection_options = {}
  @collection_options = {:capped => false}
end

Instance Attribute Details

#collection_optionsObject (readonly)

Returns the value of attribute collection_options.



29
30
31
# File 'lib/fluent/plugin/out_mongo.rb', line 29

def collection_options
  @collection_options
end

Class Method Details

.format_nocache(time) ⇒ Object

MongoDB uses BSON’s Date for time.



69
70
71
# File 'lib/fluent/plugin/out_mongo.rb', line 69

def @timef.format_nocache(time)
  time
end

Instance Method Details

#configure(conf) ⇒ Object

Raises:

  • (ConfigError)


41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/fluent/plugin/out_mongo.rb', line 41

def configure(conf)
  super

  if conf.has_key?('tag_mapped')
    @tag_mapped = true
    @disable_collection_check = true if @disable_collection_check.nil?
  else
    @disable_collection_check = false if @disable_collection_check.nil?
  end
  raise ConfigError, "normal mode requires collection parameter" if !@tag_mapped and !conf.has_key?('collection')

  if remove_tag_prefix = conf['remove_tag_prefix']
    @remove_tag_prefix = Regexp.new('^' + Regexp.escape(remove_tag_prefix))
  end

  @exclude_broken_fields = @exclude_broken_fields.split(',') if @exclude_broken_fields

  # capped configuration
  if conf.has_key?('capped')
    raise ConfigError, "'capped_size' parameter is required on <store> of Mongo output" unless conf.has_key?('capped_size')
    @collection_options[:capped] = true
    @collection_options[:size] = Config.size_value(conf['capped_size'])
    @collection_options[:max] = Config.size_value(conf['capped_max']) if conf.has_key?('capped_max')
  end

  @connection_options[:safe] = @safe

  # MongoDB uses BSON's Date for time.
  def @timef.format_nocache(time)
    time
  end

  $log.debug "Setup mongo configuration: mode = #{@tag_mapped ? 'tag mapped' : 'normal'}"
end

#emit(tag, es, chain) ⇒ Object



100
101
102
103
104
105
106
107
# File 'lib/fluent/plugin/out_mongo.rb', line 100

def emit(tag, es, chain)
  # TODO: Should replacement using eval in configure?
  if @tag_mapped
    super(tag, es, chain, tag)
  else
    super(tag, es, chain)
  end
end

#format(tag, time, record) ⇒ Object



96
97
98
# File 'lib/fluent/plugin/out_mongo.rb', line 96

def format(tag, time, record)
  [time, record].to_msgpack
end

#shutdownObject



90
91
92
93
94
# File 'lib/fluent/plugin/out_mongo.rb', line 90

def shutdown
  # Mongo::Connection checks alive or closed myself
  @clients.values.each { |client| client.db.connection.close }
  super
end

#startObject



76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/fluent/plugin/out_mongo.rb', line 76

def start
  # Non tag mapped mode, we can check collection configuration before server start.
  get_or_create_collection(@collection) unless @tag_mapped

  # From configure for avoding complex method dependency...
  if @buffer.respond_to?(:buffer_chunk_limit)
    @buffer.buffer_chunk_limit = available_buffer_chunk_limit
  else
    $log.warn "#{Fluent::VERSION} does not have :buffer_chunk_limit. Be careful when insert large documents to MongoDB"
  end

  super
end

#write(chunk) ⇒ Object



109
110
111
112
113
# File 'lib/fluent/plugin/out_mongo.rb', line 109

def write(chunk)
  # TODO: See emit comment
  collection_name = @tag_mapped ? chunk.key : @collection
  operate(get_or_create_collection(collection_name), collect_records(chunk))
end