Class: Fluent::S3Output

Inherits:
TimeSlicedOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_s3.rb

Instance Method Summary collapse

Constructor Details

#initializeS3Output

Returns a new instance of S3Output.



7
8
9
10
11
12
13
# File 'lib/fluent/plugin/out_s3.rb', line 7

def initialize
  super
  require 'aws-sdk'
  require 'zlib'
  require 'time'
  require 'tempfile'
end

Instance Method Details

#configure(conf) ⇒ Object



23
24
25
26
27
# File 'lib/fluent/plugin/out_s3.rb', line 23

def configure(conf)
  super

  @timef = TimeFormatter.new(@time_format, @localtime)
end

#format(tag, time, record) ⇒ Object



41
42
43
44
# File 'lib/fluent/plugin/out_s3.rb', line 41

def format(tag, time, record)
  time_str = @timef.format(time)
  "#{time_str}\t#{tag}\t#{Yajl.dump(record)}\n"
end

#startObject



29
30
31
32
33
34
35
36
37
38
39
# File 'lib/fluent/plugin/out_s3.rb', line 29

def start
  super
  options = {}
  if @aws_key_id && @aws_sec_key
    options[:access_key_id] = @aws_key_id
    options[:secret_access_key] = @aws_sec_key
  end
  options[:s3_endpoint] = @s3_endpoint if @s3_endpoint
  @s3 = AWS::S3.new(options)
  @bucket = @s3.buckets[@s3_bucket]
end

#write(chunk) ⇒ Object



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/fluent/plugin/out_s3.rb', line 46

def write(chunk)
  i = 0
  begin
    s3path = "#{@path}#{chunk.key}_#{i}.gz"
    i += 1
  end while @bucket.objects[s3path].exists?

  tmp = Tempfile.new("s3-")
  w = Zlib::GzipWriter.new(tmp)
  begin
    chunk.write_to(w)
    w.close
    @bucket.objects[s3path].write(Pathname.new(tmp.path))
  ensure
    w.close rescue nil
  end
end