Class: Fluent::KinesisFirehoseOutput

Inherits:
BufferedOutput
  • Object
show all
Includes:
SetTagKeyMixin, SetTimeKeyMixin
Defined in:
lib/fluent/plugin/out_kinesis_firehose.rb

Constant Summary collapse

USER_AGENT_SUFFIX =
"fluent-plugin-kinesis-firehose/#{FluentPluginKinesisFirehose::VERSION}"
PUT_RECORDS_MAX_COUNT =
500
PUT_RECORDS_MAX_DATA_SIZE =
1024 * 1024 * 4

Instance Method Summary collapse

Constructor Details

#initializeKinesisFirehoseOutput

Returns a new instance of KinesisFirehoseOutput.



28
29
30
31
32
# File 'lib/fluent/plugin/out_kinesis_firehose.rb', line 28

def initialize
  super
  require 'aws-sdk'
  require 'multi_json'
end

Instance Method Details

#configure(conf) ⇒ Object



34
35
36
# File 'lib/fluent/plugin/out_kinesis_firehose.rb', line 34

def configure(conf)
  super
end

#format(tag, time, record) ⇒ Object



38
39
40
# File 'lib/fluent/plugin/out_kinesis_firehose.rb', line 38

def format(tag, time, record)
  [tag, time, record].to_msgpack
end

#write(chunk) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/fluent/plugin/out_kinesis_firehose.rb', line 42

def write(chunk)
  chunk = chunk.to_enum(:msgpack_each)

  chunk.select {|tag, time, record|
    if not @data_key or record[@data_key]
      true
    else
      log.warn("'#{@data_key}' key does not exist: #{[tag, time, record].inspect}")
      false
    end
  }.map {|tag, time, record|
    convert_record_to_data(record)
  }.each_slice(PUT_RECORDS_MAX_COUNT) {|data_list|
    put_records(data_list)
  }
end