Class: LogStash::Outputs::S3

Inherits:
Base show all
Defined in:
lib/logstash/outputs/s3.rb

Overview

LET’S ROCK AND ROLL ON THE CODE!

Constant Summary

Constants included from Config::Mixin

Config::Mixin::CONFIGSORT

Instance Attribute Summary

Attributes included from Config::Mixin

#config, #original_params

Attributes inherited from Plugin

#logger, #params

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Base

#handle, #handle_worker, #initialize, #worker_setup, #workers_not_supported

Methods included from Config::Mixin

#config_init, included

Methods inherited from Plugin

#eql?, #finished, #finished?, #hash, #initialize, #inspect, lookup, #reload, #running?, #shutdown, #teardown, #terminating?, #to_s

Constructor Details

This class inherits a constructor from LogStash::Outputs::Base

Class Method Details

.format_message(event) ⇒ Object



346
347
348
349
350
351
352
# File 'lib/logstash/outputs/s3.rb', line 346

def self.format_message(event)
   message = "Date: #{event["@timestamp"]}\n"
   message << "Source: #{event["source"]}\n"
   message << "Tags: #{event["tags"].join(', ')}\n"
   message << "Fields: #{event.to_hash.inspect}\n"
   message << "Message: #{event["message"]}"
end

Instance Method Details

#aws_s3_configObject

Method to set up the aws configuration and establish connection



152
153
154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/logstash/outputs/s3.rb', line 152

def aws_s3_config

 @endpoint_region == 'us-east-1' ? @endpoint_region = 's3.amazonaws.com' : @endpoint_region = 's3-'+@endpoint_region+'.amazonaws.com'

 @logger.info("Registering s3 output", :bucket => @bucket, :endpoint_region => @endpoint_region)

 AWS.config(
   :access_key_id => @access_key_id,
   :secret_access_key => @secret_access_key,
   :s3_endpoint => @endpoint_region
 )
 @s3 = AWS::S3.new 

end

#getFinalPathObject

this method is used for create new path for name the file



203
204
205
206
207
208
# File 'lib/logstash/outputs/s3.rb', line 203

def getFinalPath
  
  @pass_time = Time.now 
  return @temp_directory+"ls.s3."+Socket.gethostname+"."+(@pass_time).strftime("%Y-%m-%dT%H.%M")

end

#newFile(flag) ⇒ Object

This method is used for create new empty temporary files for use. Flag is needed for indicate new subsection time_file.



237
238
239
240
241
242
243
244
245
246
247
248
249
250
# File 'lib/logstash/outputs/s3.rb', line 237

def newFile (flag)
 
  if (flag == true)
    @current_final_path = getFinalPath
    @sizeCounter = 0
  end

  if (@tags.size != 0)
    @tempFile = File.new(@current_final_path+".tag_"+@tag_path+"part"+@sizeCounter.to_s+".txt", "w")
  else
    @tempFile = File.new(@current_final_path+".part"+@sizeCounter.to_s+".txt", "w")
  end

end

#receive(event) ⇒ Object



295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
# File 'lib/logstash/outputs/s3.rb', line 295

def receive(event)
 return unless output?(event)
  
 # Prepare format of Events 
 if (@format == "plain")
    message = self.class.format_message(event)
 elsif (@format == "json")
    message = event.to_json
 else
    message = event.to_s
 end
 
 if(time_file !=0)
    @logger.debug "S3: trigger files after "+((@pass_time+60*time_file)-Time.now).to_s
 end

 # if specific the size
 if(size_file !=0)
   
   if (@tempFile.size < @size_file )

      @logger.debug "S3: File have size: "+@tempFile.size.to_s+" and size_file is: "+ @size_file.to_s
      @logger.debug "S3: put event into: "+File.basename(@tempFile)

      # Put the event in the file, now! 
      File.open(@tempFile, 'a') do |file|
        file.puts message
        file.write "\n"
      end

    else

      @logger.debug "S3: file: "+File.basename(@tempFile)+" is too large, let's bucket it and create new file"
      upFile(false, File.basename(@tempFile))
      @sizeCounter += 1
      newFile(false)

    end
    
 # else we put all in one file 
 else

   @logger.debug "S3: put event into "+File.basename(@tempFile)
   File.open(@tempFile, 'a') do |file|
     file.puts message
     file.write "\n"
   end
 end
   
end

#registerObject



253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
# File 'lib/logstash/outputs/s3.rb', line 253

def register
  require "aws-sdk"
  @temp_directory = "/opt/logstash/S3_temp/"

  if (@tags.size != 0)
      @tag_path = ""
      for i in (0..@tags.size-1)
         @tag_path += @tags[i].to_s+"." 
      end
  end

  if !(File.directory? @temp_directory)
   @logger.debug "S3: Directory "+@temp_directory+" doesn't exist, let's make it!"
   Dir.mkdir(@temp_directory)
  else
   @logger.debug "S3: Directory "+@temp_directory+" exist, nothing to do"
  end 
  
  if (@restore == true )
    @logger.debug "S3: is attempting to verify previous crashes..."
  
    upFile(true, "*.txt")    
  end
  
  newFile(true)
  
  if (time_file != 0)
     first_time = true
     @thread = time_alert(@time_file*60) do
      if (first_time == false)
        @logger.debug "S3: time_file triggered,  let's bucket the file if dosen't empty  and create new file "
        upFile(false, File.basename(@tempFile))
        newFile(true)
      else
        first_time = false
      end
    end
  end

end

#restoreObject

IMPORTANT: if you use multiple instance of s3, you should specify on one of them the “restore=> true” and on the others “restore => false”. This is hack for not destroy the new files after restoring the initial files. If you do not specify “restore => true” when logstash crashes or is restarted, the files are not sent into the bucket, for example if you have single Instance.



145
# File 'lib/logstash/outputs/s3.rb', line 145

config :restore, :validate => :boolean, :default => false

#time_alert(interval) ⇒ Object

This method is used to manage sleep and awaken thread.



168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/logstash/outputs/s3.rb', line 168

def time_alert(interval)

  Thread.new do
   loop do
     start_time = Time.now
     yield
     elapsed = Time.now - start_time
     sleep([interval - elapsed, 0].max)
   end
  end

end

#upFile(flag, name) ⇒ Object

This method is used for restore the previous crash of logstash or to prepare the files to send in bucket. Take two parameter: flag and name. Flag indicate if you want to restore or not, name is the name of file



212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# File 'lib/logstash/outputs/s3.rb', line 212

def upFile(flag, name)
  
  Dir[@temp_directory+name].each do |file|
    name_file = File.basename(file)
   
    if (flag == true)
     @logger.warn "S3: have found temporary file: "+name_file+", something has crashed before... Prepare for upload in bucket!"
    end
   
    if (!File.zero?(file))  
      write_on_bucket(file, name_file)

      if (flag == true)
         @logger.debug "S3: file: "+name_file+" restored on bucket "+@bucket
      else
         @logger.debug "S3: file: "+name_file+" was put on bucket "+@bucket
      end
    end

    File.delete (file)

  end
end

#write_on_bucket(file_data, file_basename) ⇒ Object

this method is used for write files on bucket. It accept the file and the name of file.



182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/logstash/outputs/s3.rb', line 182

def write_on_bucket (file_data, file_basename)

 # if you lose connection with s3, bad control implementation.
 if ( @s3 == nil) 
   aws_s3_config
 end

 # find and use the bucket
 bucket = @s3.buckets[@bucket]

 @logger.debug "S3: ready to write "+file_basename+" in bucket "+@bucket+", Fire in the hole!"

 # prepare for write the file
 object = bucket.objects[file_basename]
 object.write(:file => file_data, :acl => @canned_acl)

 @logger.debug "S3: has written "+file_basename+" in bucket "+@bucket + " with canned ACL \"" + @canned_acl + "\""

end