Class: Fluent::GStoreOutput
- Inherits:
-
TimeSlicedOutput
- Object
- TimeSlicedOutput
- Fluent::GStoreOutput
- Defined in:
- lib/fluent/plugin/out_gstore.rb
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #exists?(source) ⇒ Boolean
- #format(tag, time, record) ⇒ Object
-
#initialize ⇒ GStoreOutput
constructor
A new instance of GStoreOutput.
- #start ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ GStoreOutput
Returns a new instance of GStoreOutput.
7 8 9 10 11 12 13 14 15 16 |
# File 'lib/fluent/plugin/out_gstore.rb', line 7 def initialize super require 'gstore' require 'kconv' require 'zlib' require 'time' require 'tempfile' require "pathname" require "rexml/document" end |
Instance Method Details
#configure(conf) ⇒ Object
25 26 27 28 |
# File 'lib/fluent/plugin/out_gstore.rb', line 25 def configure(conf) super @timef = TimeFormatter.new(@time_format, @localtime) end |
#exists?(source) ⇒ Boolean
44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/fluent/plugin/out_gstore.rb', line 44 def exists?(source) begin REXML::Document.new source # :TODO false rescue REXML::ParseException => e true rescue Exception => e true end end |
#format(tag, time, record) ⇒ Object
39 40 41 42 |
# File 'lib/fluent/plugin/out_gstore.rb', line 39 def format(tag, time, record) time_str = @timef.format(time) "#{time_str}\t#{tag}\t#{record.to_json}\n" end |
#start ⇒ Object
30 31 32 33 34 35 36 37 |
# File 'lib/fluent/plugin/out_gstore.rb', line 30 def start super = { :access_key => @gstore_key_id, :secret_key => @gstore_sec_key } @gstore = GStore::Client.new() end |
#write(chunk) ⇒ Object
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/fluent/plugin/out_gstore.rb', line 56 def write(chunk) i = 0 begin gstorepath = "#{@path}#{chunk.key}_#{i}.gz" i += 1 end while exists? @gstore.get_object(@gstore_bucket, gstorepath) tmp = Tempfile.new("gstore-") w = Zlib::GzipWriter.new(tmp) begin chunk.write_to(w) w.close @gstore.put_object( @gstore_bucket, gstorepath, :data => Pathname.new(tmp.path).read()) ensure w.close rescue nil end end |