Module: Fluent::MongoTypedMixin

Included in:
MongoOutputReplsetTyped, MongoOutputTyped
Defined in:
lib/fluent/mixin/mongo_typed_mixin.rb

Constant Summary collapse

ITEM_TYPES =
['string', 'integer', 'bool', 'time', 'array']

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#item_typesObject

Returns the value of attribute item_types.



7
8
9
# File 'lib/fluent/mixin/mongo_typed_mixin.rb', line 7

def item_types
  @item_types
end

#time_formatObject

Returns the value of attribute time_format.



7
8
9
# File 'lib/fluent/mixin/mongo_typed_mixin.rb', line 7

def time_format
  @time_format
end

Instance Method Details

#cast(key, value) ⇒ Object



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/fluent/mixin/mongo_typed_mixin.rb', line 34

def cast(key, value)
  value = case @item_types[key]
    when 'string'
      value.to_s
    when 'integer'
      value.to_i
    when 'bool'
      Config.bool_value(value)
    when 'time'
      Time.strptime(value, @time_format)
    when 'array'
      value.split(/\s*,\s*/)
    else
      value
    end
  [key, value]
end

#collect_records(chunk) ⇒ Object



24
25
26
27
28
29
30
31
32
# File 'lib/fluent/mixin/mongo_typed_mixin.rb', line 24

def collect_records(chunk)
  records = []
  chunk.msgpack_each { |time, record|
    record = Hash[*record.map{|key, value|cast(key, value)}.flatten(1)]
    record[@time_key] = Time.at(time || record[@time_key]) if @include_time_key
    records << record
  }
  records
end

#configure(conf) ⇒ Object



9
10
11
12
13
14
15
16
17
18
19
20
21
22
# File 'lib/fluent/mixin/mongo_typed_mixin.rb', line 9

def configure(conf)
  super

  map = conf['item_types'].split(',').map do |type|
    key, type = type.split(/:/)
    if ITEM_TYPES.include?(type)
      [key, type]
    else
      raise ConfigError, "mongo_ex: 'item_types' parameter format is \"KEY:TYPE,...\"\nTYPE is #{ITEM_TYPES.join(', ')}"
    end
  end
  @item_types = Hash[*map.flatten(1)]
  @time_format = conf['time_format']
end