Class: Fluent::TreasureDataItemOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::TreasureDataItemOutput
show all
- Includes:
- TDPluginUtil
- Defined in:
- lib/fluent/plugin/out_tditem.rb
Constant Summary
collapse
- IMPORT_SIZE_LIMIT =
32 * 1024 * 1024
Instance Method Summary
collapse
#check_table_existence, #parse_bool_parameter, #summarize_record, #upload, #validate_database_and_table_name, #write
Constructor Details
Returns a new instance of TreasureDataItemOutput.
33
34
35
36
37
38
39
40
41
42
|
# File 'lib/fluent/plugin/out_tditem.rb', line 33
def initialize
super
@auto_create_table = false
@tmpdir_prefix = 'tditem-'.freeze
@key_num_limit = 1024 @record_size_limit = 32 * 1024 * 1024 @empty_gz_data = TreasureData::API.create_empty_gz_data
@user_agent = "fluent-plugin-td-item: #{TreasureDataPlugin::VERSION}".freeze
end
|
Instance Method Details
44
45
46
47
48
49
50
51
52
53
54
55
56
|
# File 'lib/fluent/plugin/out_tditem.rb', line 44
def configure(conf)
super
if @buffer.respond_to?(:buffer_chunk_limit=) && !conf.has_key?('buffer_chunk_limit')
@buffer.buffer_chunk_limit = IMPORT_SIZE_LIMIT
end
validate_database_and_table_name(@database, @table, conf)
@key = "#{@database}.#{@table}".freeze
@use_ssl = parse_bool_parameter(@use_ssl) if @use_ssl.instance_of?(String)
FileUtils.mkdir_p(@tmpdir) unless @tmpdir.nil?
end
|
#emit(tag, es, chain) ⇒ Object
70
71
72
|
# File 'lib/fluent/plugin/out_tditem.rb', line 70
def emit(tag, es, chain)
super(tag, es, chain, @key)
end
|
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
|
# File 'lib/fluent/plugin/out_tditem.rb', line 74
def format_stream(tag, es)
out = ''
off = out.bytesize
es.each { |time, record|
if record.size > @key_num_limit
log.error "Too many number of keys (#{record.size} keys)" next
end
begin
record.to_msgpack(out)
rescue RangeError
TreasureData::API.normalized_msgpack(record, out)
end
noff = out.bytesize
sz = noff - off
if sz > @record_size_limit
log.warn "Size of a record too large (#{sz} bytes): #{summarize_record(record)}"
end
off = noff
}
out
end
|
#start ⇒ Object
58
59
60
61
62
63
64
65
66
67
68
|
# File 'lib/fluent/plugin/out_tditem.rb', line 58
def start
super
client_opts = {
:ssl => @use_ssl, :http_proxy => @http_proxy, :user_agent => @user_agent, :endpoint => @endpoint,
:connect_timeout => @connect_timeout, :read_timeout => @read_timeout, :send_timeout => @send_timeout
}
@client = TreasureData::Client.new(@apikey, client_opts)
check_table_existence(@database, @table)
end
|