Class: Fluent::Cloudfront_LogInput
- Inherits:
-
Input
- Object
- Input
- Fluent::Cloudfront_LogInput
- Defined in:
- lib/fluent/plugin/in_cloudfront_log.rb
Defined Under Namespace
Classes: TimerWatcher
Instance Method Summary collapse
- #client ⇒ Object
- #configure(conf) ⇒ Object
-
#initialize ⇒ Cloudfront_LogInput
constructor
A new instance of Cloudfront_LogInput.
- #input ⇒ Object
- #parse_header(line) ⇒ Object
- #process_content(content) ⇒ Object
- #purge(filename) ⇒ Object
- #run ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
Constructor Details
#initialize ⇒ Cloudfront_LogInput
Returns a new instance of Cloudfront_LogInput.
20 21 22 23 24 25 26 27 |
# File 'lib/fluent/plugin/in_cloudfront_log.rb', line 20 def initialize super require 'logger' require 'aws-sdk' require 'zlib' require 'time' require 'uri' end |
Instance Method Details
#client ⇒ Object
69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/fluent/plugin/in_cloudfront_log.rb', line 69 def client begin = {:region => @region} if @aws_key_id and @aws_sec_key [:access_key_id] = @aws_key_id [:secret_access_key] = @aws_sec_key end @client = Aws::S3::Client.new() rescue => e log.warn("S3 client error. #{e.}") end end |
#configure(conf) ⇒ Object
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/fluent/plugin/in_cloudfront_log.rb', line 29 def configure(conf) super raise Fluent::ConfigError.new unless @log_bucket raise Fluent::ConfigError.new unless @region raise Fluent::ConfigError.new unless @log_prefix @moved_log_bucket = @log_bucket unless @moved_log_bucket @moved_log_prefix = @log_prefix + '_moved' unless @moved_log_prefix if @verbose log.info("@log_bucket: #{@log_bucket}") log.info("@moved_log_bucket: #{@moved_log_bucket}") log.info("@log_prefix: #{@log_prefix}") log.info("@moved_log_prefix: #{@moved_log_prefix}") log.info("@thread_num: #{@thread_num}") end end |
#input ⇒ Object
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 |
# File 'lib/fluent/plugin/in_cloudfront_log.rb', line 150 def input log.info("CloudFront Begining input going to list S3") begin s3_list = client.list_objects(:bucket => @log_bucket, :prefix => @log_prefix , :delimiter => @delimiter, :max_keys => @s3_get_max) rescue => e log.warn("S3 GET list error. #{e.}") return end log.info("Finished S3 get list") queue = Queue.new threads = [] log.debug("S3 List size: #{s3_list.contents.length}") s3_list.contents.each do |content| queue << content end # BEGINS THREADS @thread_num.times do threads << Thread.new do until queue.empty? work_unit = queue.pop(true) rescue nil if work_unit process_content(work_unit) end end end end log.debug("CloudFront Waiting for Threads to finish...") threads.each { |t| t.join } log.debug("CloudFront Finished") end |
#parse_header(line) ⇒ Object
82 83 84 85 86 87 88 89 |
# File 'lib/fluent/plugin/in_cloudfront_log.rb', line 82 def parse_header(line) case line when /^#Version:.+/i then @version = line.sub(/^#Version:/i, '').strip when /^#Fields:.+/i then @fields = line.sub(/^#Fields:/i, '').strip.split("\s") end end |
#process_content(content) ⇒ Object
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
# File 'lib/fluent/plugin/in_cloudfront_log.rb', line 121 def process_content(content) filename = content.key.sub(/^#{@log_prefix}\//, "") log.info("CloudFront Currently processing: #{filename}") if @verbose return if filename[-1] == '/' #skip directory/ return unless filename[-2, 2] == 'gz' #skip without gz file begin access_log_gz = client.get_object(:bucket => @log_bucket, :key => content.key).body access_log = Zlib::GzipReader.new(access_log_gz).read rescue => e log.warn("S3 GET client error. #{e.}") return end access_log.split("\n").each do |line| if line[0.1] == '#' parse_header(line) next end line = URI.unescape(line) #hoge%2520fuga -> hoge%20fuga line = URI.unescape(line) #hoge%20fuga -> hoge fuga line = line.split("\t") record = Hash[@fields.collect.zip(line)] = Time.parse("#{record['date']}T#{record['time']}+00:00").to_i router.emit(@tag, , record) end purge(filename) end |
#purge(filename) ⇒ Object
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/fluent/plugin/in_cloudfront_log.rb', line 91 def purge(filename) # Key is the name of the object without the bucket prefix, e.g: asdf/asdf.jpg source_object_key = [@log_prefix, filename].join('/') # Full path includes bucket name in addition to object key, e.g: bucket/asdf/asdf.jpg source_object_full_path = [@log_bucket, source_object_key].join('/') dest_object_key = [@moved_log_prefix, filename].join('/') dest_object_full_path = [@moved_log_bucket, dest_object_key].join('/') log.info("Copying object: #{source_object_full_path} to #{dest_object_full_path}") if @verbose begin client.copy_object(:bucket => @moved_log_bucket, :copy_source => source_object_full_path, :key => dest_object_key) rescue => e log.warn("S3 Copy client error. #{e.}") return end log.info("Deleting object: #{source_object_key} from #{@log_bucket}") if @verbose begin client.delete_object(:bucket => @log_bucket, :key => source_object_key) rescue => e log.warn("S3 Delete client error. #{e.}") return end end |
#run ⇒ Object
65 66 67 |
# File 'lib/fluent/plugin/in_cloudfront_log.rb', line 65 def run @loop.run end |
#shutdown ⇒ Object
60 61 62 63 |
# File 'lib/fluent/plugin/in_cloudfront_log.rb', line 60 def shutdown @loop.stop @thread.join end |
#start ⇒ Object
48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/fluent/plugin/in_cloudfront_log.rb', line 48 def start super log.info("Cloudfront verbose logging enabled") if @verbose client @loop = Coolio::Loop.new timer = TimerWatcher.new(@interval, true, log, &method(:input)) @loop.attach(timer) @thread = Thread.new(&method(:run)) end |