Class: Fluent::TreasureDataItemOutput

Inherits:
BufferedOutput
  • Object
show all
Includes:
TDPluginUtil
Defined in:
lib/fluent/plugin/out_tditem.rb

Constant Summary collapse

IMPORT_SIZE_LIMIT =
32 * 1024 * 1024

Instance Method Summary collapse

Methods included from TDPluginUtil

#check_table_existence, #parse_bool_parameter, #summarize_record, #upload, #validate_database_and_table_name, #write

Constructor Details

#initializeTreasureDataItemOutput

Returns a new instance of TreasureDataItemOutput.



33
34
35
36
37
38
39
40
41
42
# File 'lib/fluent/plugin/out_tditem.rb', line 33

def initialize
  super

  @auto_create_table = false
  @tmpdir_prefix = 'tditem-'.freeze
  @key_num_limit = 1024  # Item table default limitation
  @record_size_limit = 32 * 1024 * 1024  # TODO
  @empty_gz_data = TreasureData::API.create_empty_gz_data
  @user_agent = "fluent-plugin-td-item: #{TreasureDataPlugin::VERSION}".freeze
end

Instance Method Details

#configure(conf) ⇒ Object



44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/fluent/plugin/out_tditem.rb', line 44

def configure(conf)
  super

  # overwrite default value of buffer_chunk_limit
  if @buffer.respond_to?(:buffer_chunk_limit=) && !conf.has_key?('buffer_chunk_limit')
    @buffer.buffer_chunk_limit = IMPORT_SIZE_LIMIT
  end

  validate_database_and_table_name(@database, @table, conf)
  @key = "#{@database}.#{@table}".freeze
  @use_ssl = parse_bool_parameter(@use_ssl) if @use_ssl.instance_of?(String)
  FileUtils.mkdir_p(@tmpdir) unless @tmpdir.nil?
end

#emit(tag, es, chain) ⇒ Object



70
71
72
# File 'lib/fluent/plugin/out_tditem.rb', line 70

def emit(tag, es, chain)
  super(tag, es, chain, @key)
end

#format_stream(tag, es) ⇒ Object



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/fluent/plugin/out_tditem.rb', line 74

def format_stream(tag, es)
  out = ''
  off = out.bytesize
  es.each { |time, record|
    if record.size > @key_num_limit
      log.error "Too many number of keys (#{record.size} keys)" # TODO include summary of the record
      next
    end

    begin
      record.to_msgpack(out)
    rescue RangeError
      TreasureData::API.normalized_msgpack(record, out)
    end

    noff = out.bytesize
    sz = noff - off
    if sz > @record_size_limit
      # TODO don't raise error
      #raise "Size of a record too large (#{sz} bytes)"  # TODO include summary of the record
      log.warn "Size of a record too large (#{sz} bytes): #{summarize_record(record)}"
    end
    off = noff
  }
  out
end

#startObject



58
59
60
61
62
63
64
65
66
67
68
# File 'lib/fluent/plugin/out_tditem.rb', line 58

def start
  super

  client_opts = {
    :ssl => @use_ssl, :http_proxy => @http_proxy, :user_agent => @user_agent, :endpoint => @endpoint,
    :connect_timeout => @connect_timeout, :read_timeout => @read_timeout, :send_timeout => @send_timeout
  }
  @client = TreasureData::Client.new(@apikey, client_opts)

  check_table_existence(@database, @table)
end